from __future__ import annotations
import base64
from dataclasses import dataclass
from typing import Optional, Self
from lxml import objectify
from q2_sdk.tools.utils import to_bool
from .base import AuthorizationStatus, BaseAuthRequest, BaseAuthResponse, Password
[docs]
@dataclass
class Request(BaseAuthRequest):
"""
Shape that comes in from HQ when logging in.
.. code-block:: xml
<HQ request="CheckPassword" messageID="b77cb071-4cc2-4149-8f66-5db6f0f10ec9">
<LoginId>2</LoginId>
<Password Encrypted="true">tJ6HqNzIZH0iGVdmH5t6Tw==</Password>
<UserPrimaryCIF>MDcyMTIwMTA=</UserPrimaryCIF>
<CustomerPrimaryCIF>MDcyMTIwMTA=</CustomerPrimaryCIF>
<forPasswordChange>False</forPasswordChange>
<isLogon>True</isLogon>
<IsPrelogonSession>False</IsPrelogonSession>
<SessionId>f4cglgcihmr2w4ebtaz2hypj</SessionId>
</HQ>
"""
raw: objectify.Element
login_id: int
password: Password
user_primary_cif: Optional[str]
customer_primary_cif: Optional[str]
for_password_change: bool
is_logon: bool
is_prelogon_session: bool
session_id: str
@staticmethod
def from_xml(xml: objectify.Element) -> Request:
login_id = int(xml.LoginId.text)
password = Password(xml.Password.text, to_bool(xml.Password.get("Encrypted")))
user_primary_cif = None
customer_primary_cif = None
if xml.find("UserPrimaryCIF"):
user_primary_cif = base64.b64decode(xml.UserPrimaryCIF.text).decode()
if xml.find("CustomerPrimaryCIF"):
customer_primary_cif = base64.b64decode(
xml.CustomerPrimaryCIF.text
).decode()
for_password_change = to_bool(xml.forPasswordChange.text)
is_logon = to_bool(xml.isLogon.text)
is_prelogon_session = to_bool(xml.IsPrelogonSession.text)
session_id = xml.SessionId.text
return Request(
xml,
login_id,
password,
user_primary_cif,
customer_primary_cif,
for_password_change,
is_logon,
is_prelogon_session,
session_id,
)
[docs]
@dataclass
class Response(BaseAuthResponse):
"""
.. code-block:: xml
<Q2Bridge request="CheckPassword" messageID="messageID">
<Status>"Success"/"Error"</Status>
<HQErrorReturnCode>hqErrorReturnCode</HQErrorReturnCode>
<BearerToken>{0}</BearerToken>
<BearerExpiresMiliSeconds>{0}</BearerExpiresMiliSeconds>
<RefreshToken>{0}</RefreshToken>
<EndUserMessage>{0}</EndUserMessage>
<AuthorizationStatus>{0}</AuthorizationStatus>
<StatusDescription>{0}</StatusDescription>
</Q2Bridge>
"""
[docs]
@classmethod
def get_success(
cls,
bearer_token: Optional[str] = None,
bearer_expires_milli_seconds: Optional[int] = None,
refresh_token: Optional[str] = None,
exception_message: Optional[str] = None,
entitlements_were_changed: Optional[bool] = None,
) -> Self:
"""Many optional flags can be provided to the Check Password Response"""
resp = cls(cls._get_standard_auth_success_fields())
resp.add_response_field(
"authorization_status", AuthorizationStatus.PasswordIsGood
)
if bearer_token:
resp.add_response_field("bearer_token", bearer_token)
if bearer_expires_milli_seconds:
resp.add_response_field(
"bearer_expires_mili_seconds", bearer_expires_milli_seconds
)
if refresh_token:
resp.add_response_field("refresh_token", refresh_token)
if exception_message:
resp.add_response_field("exception_message", exception_message)
if entitlements_were_changed:
resp.add_response_field(
"entitlements_were_changed", entitlements_were_changed
)
return resp
[docs]
@classmethod
def get_failure(cls):
resp = cls(cls._get_standard_auth_failure_fields())
resp.add_response_field(
"authorization_status", AuthorizationStatus.PasswordIsBad
)
return resp