Source code for q2_sdk.hq.db.customer

from argparse import _SubParsersAction
import asyncio
from dataclasses import dataclass
from datetime import datetime
from functools import partial
from typing import List, Optional

from dateutil import parser as datetime_parser
from lxml import objectify
from lxml.etree import tostring
from lxml.objectify import BoolElement, E, IntElement, StringElement
from q2_sdk.core.cli.textui import colored, puts
from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.core.exceptions import MoveGroupError
from q2_sdk.hq.db.policy_data import Entity, PolicyData
from q2_sdk.hq.exceptions import CredentialsError
from q2_sdk.hq.hq_api.q2_api import DeleteCustomer
from q2_sdk.hq.models.hq_params.stored_procedure import Param
from q2_sdk.hq.models.policy_data import PolicyData as PolicyDataModel
from q2_sdk.models.demographic import Address

from .db_object import DbObject
from .representation_row_base import RepresentationRowBase

D_TYPES = ExecuteStoredProcedure.DataType


[docs] @dataclass class DeleteCustomerResponse: success: bool error_message: None
[docs] @dataclass class GetCustomerParams: customer_id: Optional[int] = None login_name: Optional[str] = None tax_id: Optional[str] = None primary_cif: Optional[str] = None zone_id: Optional[int] = None combine: Optional[bool] = False
[docs] @dataclass class UpdateCustomerValues: customer_id: int customer_name: Optional[str] = None tax_id: Optional[str] = None primary_cif: Optional[str] = None is_company: Optional[bool] = None charge_account: Optional[int] = None charge_account_valid: bool = True
[docs] def is_valid(self): parameters = [ self.customer_name, self.tax_id, self.primary_cif, self.is_company, self.charge_account, ] return any(parameters) or self.is_company is not None
[docs] def build_update_parameters(self): parameters = [] possible_params = [ Param(self.customer_id, D_TYPES.Int, "customer_id"), Param(self.tax_id, D_TYPES.VarChar, "tax_id", True), Param(self.primary_cif, D_TYPES.VarChar, "primary_cif", True), Param(self.customer_name, D_TYPES.VarChar, "customer_name", True), Param(self.is_company, D_TYPES.Bit, "is_company", True), Param(self.charge_account, D_TYPES.Int, "charge_account", True), ] for item in possible_params: item.add_to_param_list(parameters) return parameters
[docs] async def validate_charge_account(self, logger, hq_creds): policy_data = PolicyData(logger, hq_credentials=hq_creds) customer_policy = await policy_data.get(Entity.Customer, self.customer_id) if not customer_policy.Customer.AccountIdMapping: self.charge_account_valid = False return account = customer_policy.Customer.AccountIdMapping.get( int(self.charge_account) ) if not account: self.charge_account_valid = False access = account.CustomerAccess if not access or int(access) < 4: self.charge_account_valid = False
[docs] @dataclass class SearchCustomerValues: customername: Optional[str] = None taxid: Optional[str] = None primarycif: Optional[str] = None
[docs] def valid_inputs(self): if self.customername or self.taxid or self.primarycif: return True return False
[docs] @dataclass class SearchCustomerResponse: success: bool result: Optional[list] = None error_message: Optional[str] = None
[docs] class GroupMoverResponse(Exception): def __init__( self, customer_ids: list[int], success: bool, reason: str = "", omitted: bool = False, ): super().__init__() self.customer_ids = customer_ids self.success = success self.reason = reason self.omitted = omitted
[docs] class CustomerRow(RepresentationRowBase): CustomerID: IntElement = "CustomerID" CustomerName: StringElement = "CustomerName" TaxID: StringElement = "TaxID" IsCompany: BoolElement = "IsCompany" ServiceChargePlanID: IntElement = "ServiceChargePlanID" ChargeAccount: IntElement = "ChargeAccount" CreateDate: StringElement = "CreateDate" DefaultAddressID: IntElement = "DefaultAddressID" AutoGenerated: BoolElement = "AutoGenerated" DeletedDate: StringElement = "DeletedDate" GroupID: IntElement = "GroupID" ChargePlanStartDate: StringElement = "ChargePlanStartDate" CustInfo: StringElement = "CustInfo" PrimaryCIF: StringElement = "PrimaryCIF" PolicyID: IntElement = "PolicyID" CompanyPolicyID: IntElement = "CompanyPolicyID" HostUser: StringElement = "HostUser" HostPwd: StringElement = "HostPwd"
[docs] class GeneratedTransactionsPending(RepresentationRowBase): CustomerID: IntElement = "CustomerID" TransactionID: IntElement = "TransactionID" TransactionStatus: StringElement = "TransactionStatus" TransactionType: StringElement = "TransactionType" Description: StringElement = "Description"
[docs] class Customer(DbObject): """ Queries the Q2_Customer table for rows that match the given arguments. Gathers additional details on specific customers """ REPRESENTATION_ROW_CLASS = CustomerRow
[docs] def get_columns(self, extended: bool): columns = ["CustomerID", "CustomerName", "TaxID", "PrimaryCIF", "GroupID"] if extended: columns.extend([ "CreateDate", "DefaultAddressID", "AutoGenerated", "DeletedDate", "CustInfo", "PolicyID", "CompanyPolicyID", "HostUser", "HostPwd", ]) return columns
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_customers") subparser.set_defaults(parser="get_customers") subparser.set_defaults(func=partial(self.get, serialize_for_cli=True)) subparser.add_argument( "--extended", action="store_true", help="Extends the output of rows" ) subparser.add_argument( "--combine", action="store_true", help="Search by both ssn and cif" ) group = subparser.add_mutually_exclusive_group() group.add_argument("-c", "--customer-id", help="Q2_Customer.CustomerID") group.add_argument("-l", "--login-name", help="Q2_UserLogin.LoginName") subparser.add_argument("-t", "--tax-id", help="Q2_Customer.TaxID") subparser.add_argument("-p", "--primary-cif", help="Q2_Customer.PrimaryCIF") subparser = parser.add_parser("get_logins_by_customer") subparser.set_defaults(parser="get_logins_by_customer") subparser.set_defaults( func=partial(self.get_logins_by_customer, serialize_for_cli=True) ) subparser.add_argument("customer_id", help="Q2_Customer.CustomerID") subparser = parser.add_parser("get_deleted_customers") subparser.description = ( "Returns all customers deleted since the beginning of the current day" ) subparser.set_defaults(parser="get_deleted_customers") subparser.set_defaults(func=partial(self.get_deleted, serialize_for_cli=True)) subparser.add_argument( "-d", "--deleted-after", type=datetime_parser.parse, default=datetime.today(), help=""" Minimum Q2_Customer.DeletedDate to search. (Defaults to beginning of current day if not provided). """, ) subparser.add_argument( "--extended", action="store_true", help="Extends the output of rows" ) subparser = parser.add_parser("search_customers") subparser.set_defaults(parser="search_customers") subparser.set_defaults(func=partial(self.search, serialize_for_cli=True)) subparser.add_argument("-c", "--customer-name", help="Q2_Customer.CustomerID") subparser.add_argument("-t", "--tax-id", help="Q2_Customer.TaxID") subparser.add_argument("-p", "--primary-cif", help="Q2_Customer.PrimaryCIF") subparser.add_argument( "--extended", action="store_true", help="Extends the output of the rows" ) subparser = parser.add_parser("get_customers_created_after") subparser.description = ( "Returns all customers created since the beginning of the current day" ) subparser.set_defaults(parser="search_by_create_date") subparser.set_defaults( func=partial(self.search_by_create_date, serialize_for_cli=True) ) subparser.add_argument( "-c", "--created-after", type=datetime_parser.parse, default=datetime.today(), help=""" Minimum Q2_Customer.CreatedBy to search. (Defaults to beginning of current day if not provided). """, ) subparser.add_argument( "--extended", action="store_true", help="Extends the output of rows" ) subparser = parser.add_parser("move_customer_group") subparser.set_defaults(parser="move_customer_group") subparser.set_defaults(func=partial(self.move_group, serialize_for_cli=True)) subparser.add_argument("customer_id", type=int, help="Q2_Customer.CustomerID") subparser.add_argument("new_group_id", type=int, help="Q2_Group.GroupID") subparser = parser.add_parser("remove_customer") subparser.set_defaults(parser="remove_customer") subparser.set_defaults(func=partial(self.delete), serialize_for_cli=True) subparser.add_argument("customer_id", help="Q2_Customer.CustomerID") subparser = parser.add_parser("update_customer") subparser.set_defaults(parser="update_customer") subparser.set_defaults(func=partial(self.update)) subparser.add_argument("customer_id", type=int, help="Q2_Customer.CustomerID") subparser.add_argument("-n", "--customer-name", help="Q2_Customer.CustomerName") subparser.add_argument("-t", "--tax-id", help="Q2_Customer.TaxID") subparser.add_argument("-p", "--primary-cif", help="Q2_Customer.PrimaryCif") subparser.add_argument( "-c", "--is-company", action="store_true", default=None, dest="is_company", help="Q2_Customer.IsCompany", ) subparser.add_argument( "-r", "--is-retail", action="store_false", default=None, dest="is_company", help="Q2_Customer.IsCompany", ) subparser.add_argument( "-a", "--charge-account", type=int, help="Q2_Customer.ChargeAccount" )
[docs] async def get_deleted( self, deleted_after: datetime, serialize_for_cli=False, extended=False ) -> List[CustomerRow]: params = [] Param( deleted_after.isoformat(), D_TYPES.Date, "deleted_after" ).add_to_param_list(params) response = await self.call_hq( "sdk_GetDeletedCustomers", ExecuteStoredProcedure.SqlParameters(params) ) if serialize_for_cli: response = self.serialize_for_cli(response, self.get_columns(extended)) return response
[docs] async def get( self, customer_id: Optional[int] = None, login_name: Optional[str] = None, tax_id: Optional[str] = None, primary_cif: Optional[str] = None, zone_id: Optional[int] = None, extended=False, combine=False, serialize_for_cli=False, ) -> List[CustomerRow]: if combine: assert sum([x is not None for x in (tax_id, primary_cif)]) == 2, ( "If combine then both tax id and primary cif must be provided" ) else: assert ( sum([ x is not None for x in (customer_id, login_name, tax_id, primary_cif) ]) == 1 ), "Either customer_id, login_name, tax_id, or primary_cif must be defined" params = self._build_get_parameters( GetCustomerParams( customer_id, login_name, tax_id, primary_cif, zone_id, combine ) ) response = await self.call_hq( "sdk_GetCustomers", ExecuteStoredProcedure.SqlParameters(params) ) if serialize_for_cli: response = self.serialize_for_cli(response, self.get_columns(extended)) return response
[docs] async def get_many(self, customer_ids: list[int]) -> list[CustomerRow]: """ Retrieves multiple customers based on provided IDs :param customer_ids: list of Q2_Customer.CustomerID to retrieve """ # <request><get c="1" /><get c="2" /><get c="3" /></request> get_customers_list = [E.get(c=str(cust_id)) for cust_id in customer_ids] get_request_xml = E.request(*get_customers_list) get_request = tostring(get_request_xml, encoding="utf-8") params = [] Param(get_request.decode(), D_TYPES.Xml, "request").add_to_param_list(params) return await self.call_hq( "sdk_GetMultipleCustomers", ExecuteStoredProcedure.SqlParameters(params) )
[docs] async def get_logins_by_customer(self, customer_id: int, serialize_for_cli=False): params = [] Param(customer_id, D_TYPES.Int, "customer_id").add_to_param_list(params) response = await self.call_hq( "sdk_GetLoginsUnderCustomer", ExecuteStoredProcedure.SqlParameters(params) ) if serialize_for_cli: columns = [ "CustomerID", "UserID", "UserLogonID", "LoginName", "LoginStatus", ] response = self.serialize_for_cli(response, columns) return response
[docs] async def delete( self, customer_id: int, serialize_for_cli=False ) -> DeleteCustomerResponse: if not self.hq_credentials.csr_user: raise CredentialsError("csr user was not provided") if self.hq_credentials.auth_token: raise CredentialsError("authentication token has been provided") response = await DeleteCustomer.execute( DeleteCustomer.ParamsObj( self.logger, customer_id, hq_credentials=self.hq_credentials ) ) if not serialize_for_cli: return DeleteCustomerResponse( success=response.success, error_message=response.error_message )
[docs] async def get_pending_by_customer( self, customer_id_list: list[int], omit_transaction_type_ids: Optional[list[int]] = None, ) -> list[GeneratedTransactionsPending]: customer_nodes = [E.id(c=str(x)) for x in customer_id_list] omit_nodes = [] if omit_transaction_type_ids: omit_nodes = [E.compatibleTT(id=x) for x in omit_transaction_type_ids] root_node = E.request(*customer_nodes, *omit_nodes) xml_string = tostring(root_node, encoding="utf-8").decode() sql_params = [] Param(xml_string, D_TYPES.Xml, "customer_xml").add_to_param_list(sql_params) return await self.call_hq( "sdk_GetPendingTransactionsPerCustomer", ExecuteStoredProcedure.SqlParameters(sql_params), )
[docs] async def move_many_to_group( self, customer_ids: list[int], new_group_id: int, omit_pending=False ) -> list[GroupMoverResponse]: """ Allows for multiple customers to be moved to a different group. Moving customers are done in batches, and if a customer in a batch fails for some reason, the whole batch fails. Each batch will have a response object. If customers are omitted due to pending transactions (omit_pending=True), a response object will also be returned with any omissions. There are some caveats to be aware of when using this method: - Moving to or from Commercial (Treasury) groups is not allowed - Moving customers to different zones is not allowed - If a customer has pending transactions, the code will attempt to check the limits of the new group. If the new groups transaction limits are lower than the current group, the move will fail and the pending transaction will need to be addressed before the move can be attempted again. Will return a list of GroupMoverResponse objects :param customer_ids: list of Q2_Customer.CustomerID to be moved :param new_group_id: Q2_Group.GroupID to move to :param serialize_for_cli: Used when running from the command line :param omit_pending: Omit customer ids from move process if they have pending transactions that may violate limits of target group """ assert isinstance(customer_ids, list) assert isinstance(new_group_id, int) customer_list = await self.get_many(customer_ids) if not customer_list: raise ValueError("Customer(s) not found") response, valid_ids = self._sort_get_customers( customer_list=customer_list, new_group_id=new_group_id, logger=self.logger ) group_ids = set([x.GroupID for x in customer_list]) if len(group_ids) > 1: raise MoveGroupError("Multiple Original Groups") old_group_id = group_ids.pop() compatible_transaction_type_ids = await self._compare_group_transaction_limits( old_group_id, new_group_id ) omit_ids = [] if omit_pending: pending_transactions = await self.get_pending_by_customer( valid_ids, omit_transaction_type_ids=compatible_transaction_type_ids ) omit_ids = [x.CustomerID.pyval for x in pending_transactions] valid_ids = [x for x in valid_ids if x not in omit_ids] update_ids = [E.update(c=str(id)) for id in valid_ids] compatible_xml = [E.compatibleTT(id=x) for x in compatible_transaction_type_ids] max_bucket_size = 100 for x in range(0, len(valid_ids), max_bucket_size): params = [] request_xml = E.request( *update_ids[x : x + max_bucket_size], *compatible_xml, NewGroupID=str(new_group_id), ) request = tostring(request_xml, encoding="utf-8") Param(request, D_TYPES.Xml, "request").add_to_param_list(params) self.logger.info( f"Attempting to move CustomerID(s) {customer_ids} to group {new_group_id}" ) await self.call_hq( "sdk_MoveCustomerGroup", ExecuteStoredProcedure.SqlParameters(params), use_json=False, ) full_result = self.hq_response.result_node.Data.NewDataSet.Table.Column1 inner_result = objectify.fromstring(full_result.text) error_code = int(inner_result.Result.get("StatusCode", -1)) response += self._get_move_many_status( valid_ids[x : x + max_bucket_size], error_code, inner_result, logger=self.logger, ) if omit_ids: response.append( GroupMoverResponse( omit_ids, False, reason="Omitted for pending transactions", omitted=True, ) ) return response
@staticmethod def _sort_get_customers(customer_list, new_group_id, logger): # sorts customer list based on deleted customers and # customers that are already in group that we want to migrate to deleted_customers = [] currently_migrated = [] valid_ids = [] response = [] for customer in customer_list: if customer.GroupID == new_group_id: currently_migrated.append(customer.CustomerID) continue if hasattr(customer, "DeletedDate") and customer.DeletedDate: deleted_customers.append(customer.CustomerID) continue valid_ids.append(customer.CustomerID) if deleted_customers: response.append( GroupMoverResponse( customer_ids=deleted_customers, success=False, reason="Cannot move a deleted customer", ) ) logger.debug(f"Deleted customers {deleted_customers} found.") if currently_migrated: response.append( GroupMoverResponse( customer_ids=currently_migrated, success=False, reason=f"Customers are already in GroupID {new_group_id}", ) ) logger.debug( f"Customers {currently_migrated} are already in GroupID {new_group_id}." ) return response, valid_ids @staticmethod def _get_move_many_status(customer_ids, error_code, inner_result, logger): response = [] match error_code: # noqa: E999 case 0: logger.info(f"Successfully migrated customers {customer_ids}") response.append( GroupMoverResponse( customer_ids=customer_ids, success=True, reason="Successfully migrated customers", ) ) case -1: msg = inner_result.Result.get("Message") logger.debug(f"Failed to migrate customers {customer_ids}. {msg}") response.append( GroupMoverResponse( customer_ids=customer_ids, success=False, reason=msg ) ) case -2: msg = "Error: One or more customer ids belong to different groups" logger.debug(f"Failed to migrate customers {customer_ids}. {msg}") response.append( GroupMoverResponse( customer_ids=customer_ids, success=False, reason=msg ) ) case -3: msg = "Error: One or more customer ids contained a pending transaction" additional_error_details = inner_result.Result.get( "CustomersWithTransactions" ) if additional_error_details: msg += additional_error_details logger.debug(f"Failed to migrate customers {customer_ids}. {msg}") response.append( GroupMoverResponse( customer_ids=customer_ids, success=False, reason=msg ) ) case -4: msg = "Error: One or more customer id groups did not have matching Zone ids" logger.debug(f"Failed to migrate customers {customer_ids}. {msg}") response.append( GroupMoverResponse( customer_ids=customer_ids, success=False, reason=msg ) ) return response
[docs] async def move_group( self, customer_id: int, new_group_id: int, serialize_for_cli=False ) -> int: """ Move a customer to another group. There are some caveats to be aware of when using this method: - Moving to or from Commercial (Treasury) groups is not allowed - Moving customers to different zones is not allowed - If a customer has pending transactions, the code will attempt to check the limits of the new group. If the new groups transaction limits are lower than the current group, the move will fail and the pending transaction will need to be addressed before the move can be attempted again. If successful, will return the same customer id If unsuccessful, will raise a DatabaseDataError. :param customer_id: Q2_Customer.CustomerID to be moved :param new_group_id: Q2_Group.GroupID to move to :param serialize_for_cli: Used when running from the command line """ assert isinstance(customer_id, int) assert isinstance(new_group_id, int) self.logger.info("Moving CustomerID %d to Group %d", customer_id, new_group_id) fresh_db_obj = Customer( self.logger, hq_credentials=self.hq_credentials, ret_table_obj=True ) existing_customer = (await fresh_db_obj.get(customer_id=customer_id))[0] if ( hasattr(existing_customer, "DeletedDate") and existing_customer.DeletedDate is not None ): raise MoveGroupError("Cannot move a deleted customer") old_group_id = existing_customer.GroupID if new_group_id == old_group_id: raise MoveGroupError("Customer is already in GroupID %s" % new_group_id) compatible_transaction_type_ids = await self._compare_group_transaction_limits( old_group_id, new_group_id ) compatible_xml = [E.compatibleTT(id=x) for x in compatible_transaction_type_ids] # <request NewGroupID="1"><update c="1" /><update c="2" /><update c="3" /></request> request_xml = E.request( E.update(c=str(customer_id)), *compatible_xml, NewGroupID=str(new_group_id) ) request = tostring(request_xml, encoding="utf-8") params = [] Param(request, D_TYPES.Xml, "request").add_to_param_list(params) await self.call_hq( "sdk_MoveCustomerGroup", ExecuteStoredProcedure.SqlParameters(params), use_json=False, ) full_result = self.hq_response.result_node.Data.NewDataSet.Table.Column1 inner_result = objectify.fromstring(full_result.text) error_code = int(inner_result.Result.get("StatusCode", -1)) if error_code != 0: self.logger.error("An error occurred when moving customer") raise MoveGroupError(error_code) if serialize_for_cli: puts(colored.yellow(f"Customer has moved to new group {new_group_id}")) return return customer_id
[docs] async def update( self, customer_id: int, customer_name=None, tax_id=None, primary_cif=None, is_company: Optional[bool] = None, charge_account: Optional[int] = None, ): assert isinstance(customer_id, int), "Please provide a valid customer id" update_values = UpdateCustomerValues(*[ customer_id, customer_name, tax_id, primary_cif, is_company, charge_account, ]) assert update_values.is_valid(), "No values to update" if update_values.charge_account: await update_values.validate_charge_account( self.logger, self.hq_credentials ) assert update_values.charge_account_valid, ( "Charge account is not valid for this customer" ) sql_params = update_values.build_update_parameters() response = await self.call_hq( "sdk_UpdateCustomer", ExecuteStoredProcedure.SqlParameters(sql_params) ) return response
@staticmethod def _build_get_parameters(get_info: GetCustomerParams): parameters = [] possible_params = [] if get_info.combine: possible_params.extend([ Param(get_info.tax_id, D_TYPES.VarChar, "tax_id"), Param(get_info.primary_cif, D_TYPES.VarChar, "primary_cif"), ]) elif get_info.customer_id: possible_params.append( Param(get_info.customer_id, D_TYPES.Int, "customer_id") ) elif get_info.login_name: possible_params.append( Param(get_info.login_name, D_TYPES.VarChar, "login_name") ) elif get_info.tax_id: possible_params.append(Param(get_info.tax_id, D_TYPES.VarChar, "tax_id")) else: possible_params.append( Param(get_info.primary_cif, D_TYPES.VarChar, "primary_cif") ) if get_info.zone_id: possible_params.append( Param(get_info.zone_id, D_TYPES.Int, "zone_id", True) ) for item in possible_params: item.add_to_param_list(parameters) return parameters
[docs] async def search_by_create_date( self, created_after: datetime, serialize_for_cli: bool = False, extended: bool = False, group_id: Optional[int] = None, filter_out_deleted: bool = False, page_number: Optional[int] = None, page_size: Optional[int] = None, ) -> List[CustomerRow]: params = [] Param( created_after.isoformat(), D_TYPES.DateTime, "created_after" ).add_to_param_list(params) stored_proc_name = "sdk_GetCustomersByCreateDate" if group_id: Param(group_id, D_TYPES.Int, "group_id").add_to_param_list(params) if filter_out_deleted: Param( filter_out_deleted, D_TYPES.Bit, "filter_out_deleted" ).add_to_param_list(params) if page_number and page_size: assert int(page_number), "page parameter must be an integer" offset = (page_number - 1) * page_size assert offset >= 0, "page_number mush be 1 or greater" assert int(page_size), "page_size must be an integer" Param(offset, D_TYPES.Int, "offset").add_to_param_list(params) Param(page_size, D_TYPES.Int, "returnCount").add_to_param_list(params) stored_proc_name = "sdk_GetCustomersByCreateDatePaginated" response = await self.call_hq( stored_proc_name, ExecuteStoredProcedure.SqlParameters(params) ) if serialize_for_cli: response = self.serialize_for_cli(response, self.get_columns(extended)) return response
async def _compare_group_transaction_limits( self, old_group_id, new_group_id ) -> list[str]: policy_obj = PolicyData(self.logger, hq_credentials=self.hq_credentials) calls = [ asyncio.create_task( policy_obj.get(Entity.Group, old_group_id), name="old_policy_call" ), asyncio.create_task( policy_obj.get(Entity.Group, new_group_id), name="new_policy_call" ), ] await asyncio.gather(*calls) old_group_policy = calls[0].result() new_group_policy = calls[1].result() return self._check_limits(old_group_policy, new_group_policy) @staticmethod def _check_limits( old_group_policy: PolicyDataModel, new_group_policy: PolicyDataModel ) -> list[str]: compatible_transaction_type_ids = [] for transaction in old_group_policy.Group.GeneratedTransactionRights: new_group_transaction = ( new_group_policy.Group.TransactionRightsShortNameMapping.get( transaction.ShortName ) ) if ( new_group_transaction is not None and new_group_transaction >= transaction ): compatible_transaction_type_ids.append( str(new_group_transaction.TransactionTypeID) ) return compatible_transaction_type_ids
[docs] async def search( self, customer_name: str = "", tax_id: str = "", primary_cif: str = "", serialize_for_cli=False, extended=False, ): search_param = SearchCustomerValues( customername=customer_name, taxid=tax_id, primarycif=primary_cif ) if not search_param.valid_inputs(): return SearchCustomerResponse( success=False, error_message="Invalid parameters. Accepts any combination of " "[CustomerName, TaxID and PrimaryCIF]", ) sql_params = [] if search_param.customername: Param( search_param.customername, D_TYPES.VarChar, "customername" ).add_to_param_list(sql_params) if search_param.taxid: Param(search_param.taxid, D_TYPES.VarChar, "taxid").add_to_param_list( sql_params ) if search_param.primarycif: Param( search_param.primarycif, D_TYPES.VarChar, "primarycif" ).add_to_param_list(sql_params) results = await self.call_hq( "CaliperAPI_SearchCustomers", ExecuteStoredProcedure.SqlParameters(sql_params), ) if not results: return SearchCustomerResponse(success=False, error_message="No users found") if serialize_for_cli: return self.serialize_for_cli(results, self.get_columns(extended)) return SearchCustomerResponse(success=True, result=results)
[docs] async def update_customer_address(self, customer_id: int, address: Address): """ :param customer_id: the CustomerID of the customer you would like to update :param address: the address object containing the customer's new address information :return `MessageCount` field. If value returned is `1`, update was successful """ assert isinstance(customer_id, int), "Please provide a valid customer id" assert isinstance(address, Address), "Please provide a valid address" params = [] Param(customer_id, D_TYPES.Int, "CustomerID").add_to_param_list(params) Param(address.country, D_TYPES.Char, "CountryCode").add_to_param_list(params) Param(address.address_1, D_TYPES.VarChar, "Address1").add_to_param_list(params) Param(address.address_2, D_TYPES.VarChar, "Address2").add_to_param_list(params) Param(address.state, D_TYPES.VarChar, "State").add_to_param_list(params) Param(address.zipcode, D_TYPES.VarChar, "Zipcode").add_to_param_list(params) Param(address.city, D_TYPES.VarChar, "City").add_to_param_list(params) Param(address.province, D_TYPES.VarChar, "Province").add_to_param_list(params) Param(address.address_type, D_TYPES.VarChar, "AddressType").add_to_param_list( params ) response = await self.call_hq( "sdk_UpdateCustomerAddress", ExecuteStoredProcedure.SqlParameters(params) ) return response