from argparse import _SubParsersAction
from functools import partial
from typing import List
from lxml.objectify import IntElement, StringElement
from q2_sdk.core.dynamic_imports import (
api_ExecuteStoredProcedure as ExecuteStoredProcedure,
)
from q2_sdk.hq.models.online_session import OnlineSession
from q2_sdk.hq.models.online_user import OnlineUser
from .audit_record import AuditRecord
from .db_object import DbObject
from .representation_row_base import RepresentationRowBase
[docs]
class EmailRow(RepresentationRowBase):
EmailID: IntElement = "EmailID"
UserID: IntElement = "UserID"
EmailAddress: StringElement = "EmailAddress"
[docs]
class Email(DbObject):
REPRESENTATION_ROW_CLASS = EmailRow
[docs]
def add_arguments(self, parser: _SubParsersAction):
subparser = parser.add_parser("get_email_by_user")
subparser.set_defaults(parser="get_email_by_user")
subparser.set_defaults(func=partial(self.get_by_user, serialize_for_cli=True))
subparser.add_argument("user_id", help="Q2_User.UserID")
subparser = parser.add_parser("get_email_by_id")
subparser.set_defaults(parser="get_email_by_id")
subparser.set_defaults(func=partial(self.get_by_id, serialize_for_cli=True))
subparser.add_argument("email_id", help="Q2_User.EmailID")
subparser = parser.add_parser("get_email_primary")
subparser.set_defaults(parser="get_email_primary")
subparser.set_defaults(func=partial(self.get_primary, serialize_for_cli=True))
subparser.add_argument("user_id", help="Q2_User.UserID")
subparser = parser.add_parser("update_email_by_id")
subparser.set_defaults(parser="update_email_by_id")
subparser.set_defaults(func=partial(self.update))
subparser.add_argument("email_id", help="Q2_Email.EmailID")
subparser.add_argument("email", help="Q2_Email.EmailAddress")
subparser = parser.add_parser("create_email")
subparser.set_defaults(parser="create_email")
subparser.set_defaults(func=partial(self.create))
subparser.add_argument("user_id", help="Q2_User.UserID")
subparser.add_argument("email", help="Q2_Email.EmailAddress")
subparser = parser.add_parser("delete_email_by_id")
subparser.set_defaults(parser="delete_email_by_id")
subparser.set_defaults(
func=partial(
self.delete, online_session=OnlineSession(), online_user=OnlineUser()
)
)
subparser.add_argument("email_id", help="Q2_Email.EmailID")
[docs]
async def get_by_user(
self, user_id: int, serialize_for_cli=False
) -> List[EmailRow]:
try:
user_id = int(user_id)
except ValueError as err:
raise ValueError("Please supply a valid user ID") from err
response = await self.call_hq(
"sdk_GetEmail",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "user_id", user_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "action", "get"
),
]),
)
if serialize_for_cli:
columns = ["EmailID", "UserID", "EmailAddress"]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def get_by_id(self, email_id: int, serialize_for_cli=False) -> List[EmailRow]:
try:
email_id = int(email_id)
except ValueError as err:
raise ValueError("Please supply a valid Email ID") from err
assert isinstance(email_id, int), "Please supply a valid email id"
response = await self.call_hq(
"sdk_GetEmail",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "email_id", email_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "action", "get_id"
),
]),
)
if serialize_for_cli:
columns = ["EmailID", "UserID", "EmailAddress"]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def get_primary(
self, user_id: int, serialize_for_cli=False
) -> List[EmailRow]:
try:
user_id = int(user_id)
except ValueError as err:
raise ValueError("Please supply a valid user ID") from err
response = await self.call_hq(
"sdk_GetEmail",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "user_id", user_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "action", "get_primary"
),
]),
)
if serialize_for_cli:
columns = ["EmailID", "UserID", "EmailAddress"]
response = self.serialize_for_cli(response, columns)
return response
[docs]
async def create(self, user_id: int, email: str):
try:
user_id = int(user_id)
except ValueError as err:
raise ValueError("Please supply a valid user ID") from err
response = await self.call_hq(
"sdk_AddEmail",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "user_id", user_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "email_address", email
),
]),
)
return response
[docs]
async def update(self, email_id: int, email: str):
try:
email_id = int(email_id)
except ValueError as err:
raise ValueError("Please supply a valid email ID") from err
response = await self.call_hq(
"sdk_UpdateEmail",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "email_id", email_id
),
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.VarChar, "email_address", email
),
]),
)
return response
[docs]
async def delete(
self, email_id: int, online_session: OnlineSession, online_user: OnlineUser
):
try:
email_id = int(email_id)
except ValueError as err:
raise ValueError("Please supply a valid email ID") from err
response = await self.call_hq(
"sdk_RemoveEmail",
ExecuteStoredProcedure.SqlParameters([
ExecuteStoredProcedure.SqlParam(
ExecuteStoredProcedure.DataType.Int, "email_id", email_id
)
]),
)
if self.hq_response.success:
audit = AuditRecord(self.logger, self.hq_credentials)
await audit.create(
f"Email ID {email_id} deleted",
online_session.session_id,
workstation_id=online_session.workstation,
customer_id=online_user.customer_id,
user_id=online_user.user_id,
user_logon_id=online_user.user_logon_id,
)
return response