Source code for q2_sdk.hq.db.email

from argparse import _SubParsersAction
from functools import partial
from typing import List
from lxml.objectify import IntElement, StringElement

from q2_sdk.core.dynamic_imports import (
    api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.models.online_session import OnlineSession
from q2_sdk.hq.models.online_user import OnlineUser
from .audit_record import AuditRecord
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase


[docs] class EmailRow(RepresentationRowBase): EmailID: IntElement = "EmailID" UserID: IntElement = "UserID" EmailAddress: StringElement = "EmailAddress"
[docs] class Email(DbObject): REPRESENTATION_ROW_CLASS = EmailRow
[docs] def add_arguments(self, parser: _SubParsersAction): subparser = parser.add_parser("get_email_by_user") subparser.set_defaults(parser="get_email_by_user") subparser.set_defaults(func=partial(self.get_by_user, serialize_for_cli=True)) subparser.add_argument("user_id", help="Q2_User.UserID") subparser = parser.add_parser("get_email_by_id") subparser.set_defaults(parser="get_email_by_id") subparser.set_defaults(func=partial(self.get_by_id, serialize_for_cli=True)) subparser.add_argument("email_id", help="Q2_User.EmailID") subparser = parser.add_parser("get_email_primary") subparser.set_defaults(parser="get_email_primary") subparser.set_defaults(func=partial(self.get_primary, serialize_for_cli=True)) subparser.add_argument("user_id", help="Q2_User.UserID") subparser = parser.add_parser("update_email_by_id") subparser.set_defaults(parser="update_email_by_id") subparser.set_defaults(func=partial(self.update)) subparser.add_argument("email_id", help="Q2_Email.EmailID") subparser.add_argument("email", help="Q2_Email.EmailAddress") subparser = parser.add_parser("create_email") subparser.set_defaults(parser="create_email") subparser.set_defaults(func=partial(self.create)) subparser.add_argument("user_id", help="Q2_User.UserID") subparser.add_argument("email", help="Q2_Email.EmailAddress") subparser = parser.add_parser("delete_email_by_id") subparser.set_defaults(parser="delete_email_by_id") subparser.set_defaults( func=partial( self.delete, online_session=OnlineSession(), online_user=OnlineUser() ) ) subparser.add_argument("email_id", help="Q2_Email.EmailID")
[docs] async def get_by_user( self, user_id: int, serialize_for_cli=False ) -> List[EmailRow]: try: user_id = int(user_id) except ValueError as err: raise ValueError("Please supply a valid user ID") from err response = await self.call_hq( "sdk_GetEmail", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "user_id", user_id ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "action", "get" ), ]), ) if serialize_for_cli: columns = ["EmailID", "UserID", "EmailAddress"] response = self.serialize_for_cli(response, columns) return response
[docs] async def get_by_id(self, email_id: int, serialize_for_cli=False) -> List[EmailRow]: try: email_id = int(email_id) except ValueError as err: raise ValueError("Please supply a valid Email ID") from err assert isinstance(email_id, int), "Please supply a valid email id" response = await self.call_hq( "sdk_GetEmail", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "email_id", email_id ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "action", "get_id" ), ]), ) if serialize_for_cli: columns = ["EmailID", "UserID", "EmailAddress"] response = self.serialize_for_cli(response, columns) return response
[docs] async def get_primary( self, user_id: int, serialize_for_cli=False ) -> List[EmailRow]: try: user_id = int(user_id) except ValueError as err: raise ValueError("Please supply a valid user ID") from err response = await self.call_hq( "sdk_GetEmail", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "user_id", user_id ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "action", "get_primary" ), ]), ) if serialize_for_cli: columns = ["EmailID", "UserID", "EmailAddress"] response = self.serialize_for_cli(response, columns) return response
[docs] async def create(self, user_id: int, email: str): try: user_id = int(user_id) except ValueError as err: raise ValueError("Please supply a valid user ID") from err response = await self.call_hq( "sdk_AddEmail", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "user_id", user_id ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "email_address", email ), ]), ) return response
[docs] async def update(self, email_id: int, email: str): try: email_id = int(email_id) except ValueError as err: raise ValueError("Please supply a valid email ID") from err response = await self.call_hq( "sdk_UpdateEmail", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "email_id", email_id ), ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.VarChar, "email_address", email ), ]), ) return response
[docs] async def delete( self, email_id: int, online_session: OnlineSession, online_user: OnlineUser ): try: email_id = int(email_id) except ValueError as err: raise ValueError("Please supply a valid email ID") from err response = await self.call_hq( "sdk_RemoveEmail", ExecuteStoredProcedure.SqlParameters([ ExecuteStoredProcedure.SqlParam( ExecuteStoredProcedure.DataType.Int, "email_id", email_id ) ]), ) if self.hq_response.success: audit = AuditRecord(self.logger, self.hq_credentials) await audit.create( f"Email ID {email_id} deleted", online_session.session_id, workstation_id=online_session.workstation, customer_id=online_user.customer_id, user_id=online_user.user_id, user_logon_id=online_user.user_logon_id, ) return response