import json
import re
from typing import Optional, TYPE_CHECKING
from q2_sdk.core.exceptions import HqResponseError, NotExecutableError
from q2_sdk.hq.exceptions import (
AdminTokenRequired,
AdminTwoTokensRequired,
CredentialsError,
LoginPasswordStatusExpired,
)
if TYPE_CHECKING:
from q2_sdk.core.q2_logging.logger import Q2LoggerType
ZONED_BUILD_NUMBER = 5669
[docs]
class HqCredentials:
"""Holds information for connecting to an HQ instance"""
def __init__(
self,
hq_url: str,
csr_user: str,
csr_pwd: str,
aba: str,
auth_token: Optional[str] = None,
reported_hq_url: Optional[str] = None,
customer_key: Optional[str] = None,
database_name: Optional[str] = None,
db_schema_name: Optional[str] = None,
env_stack: Optional[str] = None,
db_server_name: Optional[str] = None,
):
self.csr_user = csr_user
self.csr_pwd = csr_pwd
self.aba = aba
self.auth_token = auth_token
self.customer_key = self._clean_customer_key(customer_key)
self.database_name = database_name
self.db_schema_name = db_schema_name
self.db_server_name = db_server_name
self.env_stack = env_stack
self.hq_url_list = self.split_hq_url_list(hq_url)
self.hq_url_pointer = 0
if reported_hq_url:
reported_hq_url = self._clean_hq_url(reported_hq_url)
self.reported_hq_url = reported_hq_url
self._build_number = None
@classmethod
def from_dict(cls, data: dict) -> "HqCredentials":
return cls(
hq_url=data["hq_url_list"][0],
csr_user=data["csr_user"],
csr_pwd=data["csr_pwd"],
aba=data["aba"],
auth_token=data.get("auth_token"),
reported_hq_url=data.get("reported_hq_url"),
customer_key=data.get("customer_key"),
database_name=data.get("database_name"),
db_schema_name=data.get("db_schema_name"),
env_stack=data.get("db_server_name"),
db_server_name=data.get("env_stack"),
)
@property
def hq_url(self):
return self.hq_url_list[self.hq_url_pointer]
@staticmethod
def _clean_customer_key(customer_key):
if customer_key:
customer_key = re.sub(r"^(DEV|STG|STAGE|PRD|PROD)_", "", customer_key)
return customer_key
[docs]
@staticmethod
def split_hq_url_list(hq_url_raw: str) -> list:
"""Turns a raw comma delimited HQ URL string into a list of formatted HQ URLs"""
clean_list_of_hq_urls = [""]
if hq_url_raw:
cleaned_hq_url = HqCredentials._clean_hq_url(hq_url_raw)
list_of_hq_urls = cleaned_hq_url.split(",")
clean_list_of_hq_urls = [x.strip() for x in list_of_hq_urls]
return clean_list_of_hq_urls
@staticmethod
def _clean_hq_url(hq_url_raw: str) -> str:
cleaned_hq_url = re.sub(r"\w+\.as[hm]x", "", hq_url_raw)
cleaned_hq_url = re.sub(r"/\s*,", ",", cleaned_hq_url).rstrip("/")
return cleaned_hq_url
[docs]
def rotate_hq_url(self):
"""If there is a list of HQ_URLs in settings, switch active to the next one"""
self.hq_url_pointer = (self.hq_url_pointer + 1) % len(self.hq_url_list)
async def get_build_number(self):
if not self._build_number:
from q2_sdk.hq.hq_api.q2_api import GetHqVersion as q2api_version
from q2_sdk.hq.hq_api.wedge_online_banking import (
GetHqVersion as wob_version,
)
if self.auth_token:
call_version = wob_version
else:
call_version = q2api_version
hq_response = await call_version.execute(call_version.ParamsObj(None, self))
hq_build = hq_response.result_node.get("HqVersion").split(".")[-1]
clean_version = "".join([x for x in hq_build if x.isdigit()])
try:
self._build_number = int(clean_version)
except (TypeError, ValueError) as err:
raise HqResponseError(f"Cannot get valid build number: {err}") from err
return self._build_number
async def is_zoned(self) -> bool:
hq_build = await self.get_build_number()
is_zoned = False
if hq_build >= ZONED_BUILD_NUMBER:
is_zoned = True
return is_zoned
[docs]
def get_backoffice_credentials(
self, logger: "Q2LoggerType", user_name: str, password: str, auto_logoff=True
):
"""
Returns a BackOfficeHqCredentials object usable with Q2API.BackOffice endpoints
:param logger: Reference to calling request's logger (self.logger in your extension)
:param user_name: Central Username
:param password: Central Password
:param auto_logoff: If True, session will end after the first network call
"""
return BackOfficeHqCredentials(
logger,
self,
user_name=user_name,
password=password,
auto_logoff=auto_logoff,
)
def __repr__(self):
return json.dumps(self.serialize_as_dict())
def __eq__(self, other):
return all([
self.csr_user == other.csr_user,
self.csr_pwd == other.csr_pwd,
self.aba == other.aba,
self.auth_token == other.auth_token,
self.hq_url_list == other.hq_url_list,
self.customer_key == other.customer_key,
self.database_name == other.database_name,
self.db_schema_name == other.db_schema_name,
self.env_stack == other.env_stack,
])
def serialize_as_dict(self):
return {
"HqUrl": self.hq_url,
"CsrUser": self.csr_user,
"CsrPwd": self.csr_pwd,
"ABA": self.aba,
"HqList": self.hq_url_list,
"Token": self.auth_token,
"CustomerKey": self.customer_key,
"DbName": self.database_name,
"DbSchemaName": self.db_schema_name,
"DbServerName": self.db_server_name,
"EnvStack": self.env_stack,
}
[docs]
class BackOfficeHqCredentials(HqCredentials):
def __init__(
self,
logger: "Q2LoggerType",
hq_credentials: HqCredentials,
user_name: str | None = None,
password: str | None = None,
auto_logoff=True,
session_token=None,
):
"""
BackOffice endpoints require an CsrUser that is allowed Central access.
The user typically provided in self.hq_credentials for a RequestHandler
does not have this ability, so it is often necessary to pass a new
user_name/password combination obtained by another means.
:param logger: Reference to calling request's logger (self.logger in your extension)
:param hq_credentials: HQ Connectivity Information
:param user_name: Central Username
:param password: Central Password
:param auto_logoff: If True, session will end after the first network call
:param session_token: If a backoffice session already exists, populate it here
"""
super().__init__(
hq_credentials.hq_url,
hq_credentials.csr_user,
hq_credentials.csr_pwd,
hq_credentials.aba,
auth_token=hq_credentials.auth_token,
reported_hq_url=hq_credentials.reported_hq_url,
customer_key=hq_credentials.customer_key,
)
self.base_hq_credentials = hq_credentials
self.logger = logger
self.csr_user = user_name
self.csr_pwd = password
self.auto_logoff = auto_logoff
self._cookie = session_token
async def get_cookie(self):
if not self._cookie:
from q2_sdk.hq.hq_api.backoffice import LogonAdmin
params_obj = LogonAdmin.ParamsObj(
self.logger,
self.base_hq_credentials,
self.hq_url,
self.csr_user,
self.csr_pwd,
)
response = await LogonAdmin.execute(params_obj)
if response.success:
self._cookie = response.headers["Set-Cookie"]
else:
error_msg = f"Unable to connect to Hq Backoffice with given credentials. username: {self.csr_user}"
match response.error_code:
case -95 | -97:
raise LoginPasswordStatusExpired(error_msg)
case -666:
raise AdminTokenRequired(error_msg)
case -665:
raise AdminTwoTokensRequired(error_msg)
case _:
raise CredentialsError(error_msg)
return self._cookie
async def logoff(self):
from q2_sdk.hq.hq_api.backoffice import Logoff
assert self._cookie, "Cannot logoff without a cookie (call self.get_cookie())"
params_obj = Logoff.ParamsObj(self.logger, self)
response = await Logoff.execute(params_obj)
if response.success:
self._cookie = None
return response
[docs]
def get_backoffice_credentials(
self, logger: "Q2LoggerType", user_name: str, password: str, auto_logoff=True
):
"""
This function will error when called on BackOfficeHqCredentials
"""
raise NotExecutableError(
"Already an instance of BackOfficeHqCredentials! Just use this object."
)