Create Coreflow

The next steps will create a coreflow for the CorePro core. A coreflow builds functionality into an already existing core, or allows for a completely new core to be built if a base does not already exist. This allows developers to build new core calls or change behavior of already existing core calls to suit their needs without waiting on the SDK team to build a new feature into the q2-cores package.

Create a coreflow

Run the following command in the terminal to start the coreflow building process:

$ q2 create_coreflow -c CoreProExtended

The next prompt asks what core to use as a base. For this tutorial, select the CorePro core.

The next prompt asks the name of your coreflow operation. For this tutorial, use update_address

Note

If a specific core is not listed, then it either has not been built in the q2-cores package yet, or is is known by a different name. For example, DNA is known as OSI in the q2-cores package, or DNA could refer to CoreAPI. If you are unsure about the name of your core, please reach out to the SDK team. If your core is not listed, you can use the base option and build your specific core.

Notice that a new directory CoreProExtended has been created in the root of the repo, and a number of files were generated. The files and file structure of this new directory mirror the structure used in the q2-cores package. Also notice that settings.py has been changed to reflect the new core with the CUSTOM_CORE parameter. With that new parameter, the CORE value can be changed to the new coreflow. Change the CORE parameter to the below line:

CORE = "CoreProExtended"

A config file is also required for the new core flow. But, since this is a custom core, the configuration name should match the core name found in the CUSTOM_CORES dictionary:

$ q2 generate_config CoreProExtended

Before diving into the new generated files, the new coreflow should be imported. Comment out the core import line at the top of extension.py, and add the following line below the previous import statement:

from CoreProExtended.core import Core as CoreProExtendedCore

Also, change the helper function created in the previous step. It should reflect the CoreProCore to CoreProExtendedCore change:

@property
def core(self) -> CoreProExtendedCore:

    # noinspection PyTypeChecker
    return super().core

Render the form after these changes to ensure that the form is still pre-populating as expected.

The Coreflow Files

A number of files and directories were created in the new CoreProExtended module.

core.py

This file is the main class of the new coreflow module. It holds the Core class which is imported in the code, and it gives access to the queries, mappers, and models.

queries

This directory holds the query files. A query constructs a core call, and passes that call to a mapper for execution. The accepted convention is one core call per file.

mappers

This directory holds the mapper files. A mapper executes a query, or multiple queries, and parses the return. This is where the return is checked for errors, and the returned data is mapped to a model. That model is then returned. It is possible that multiple queries are needed to be run to get all the relevant data. A demographics call could consist of a “get entity” call, and a email/phone/address call in order to get a complete demographic object. Or, an “initial search” query might be necessary to get a unique value that identifies the end user before a demographic all can be made.

models

This directory holds the model files. A model is a class or data structure that contains the information from the returned core call. Models are not always necessary, but they are a convenient way to organize a large data set like demographic information.

Building a new core call starts with the query. For simplicity, assume this new core call takes a JSON object and responds with JSON. The response used in the mock_response method should be a known response to the core call. Replace the CoreProExtended/queries/update_address_query.py file’s content with the following code.

Note

Some Q2 core adapters will do post-processing on the incoming query shape to create valid SOAP shapes or to include configuration values. Looking at the already existing queries provided in the q2-cores will show a request shape that is expected and accepted by they Q2 adapter. For example, the JXchange core expects a valid SOAP shape, but the demographic call provided by the q2-cores package does not build a full SOAP shape, and it is the adapter that builds the shape. Instead, the JXchange query root node is used as the SOAP action and the xml is wrapped in a “passthru” node where a “servicename” attribute is provided to direct the call to the appropriate core URL. This can be viewed in Queries and in this adapter code snippet .. literalinclude:: code_snippets/create_coreflow/jxchange_adapter_snippet.cs

language:

cs

import json
import logging
from q2_sdk.models.cores.queries.base_query import BaseQuery


class UpdateAddressQuery(BaseQuery):
    """
    Handles construction of core requests as well as mock responses for testing.
    """

    def __init__(self, logger: logging.Logger, new_address: dict, users_ssn: str, api_key: str, mock_failure=False):
        """
        Object which builds the query to be send to the Core.
        If server is running debug mode, will return mocked response instead of calling the core.

        :param logger: Reference to calling request's logger (self.logger in your extension)
        :param new_address: The new address
        :param users_ssn: The end users social security number
        :param api_key: CorePro API key
        :param mock_failure: If server is running in debug mode, will be used while mocking the response
        """
        self.new_address = new_address
        self.users_ssn = users_ssn
        self.api_key = api_key
        super().__init__(logger, mock_failure)

    def build(self) -> str:
        """
        Constructs and returns the core request from a template and provided parameters.
        """
        update_address_call = {
            "new_address": {
                "address_1": self.new_address['address_1'],
                "address_2": self.new_address.get('address_2', ''),
                "city": self.new_address['city'],
                "state": self.new_address['state'],
                "zip": self.new_address['zip_code']
            },
            "user": self.users_ssn,
            "api_key": self.api_key
        }

        return json.dumps(update_address_call)

    def mock_response(self) -> str:
        """
        Constructs and returns a mock core response used for testing queries and mappers.
        """
        return json.dumps({"Success": True, "Error": 0})

Next, focus on the core.py file to make sure the query method is given all the values needed to build the core call. Notice configured_user and config. The configured_user is a copy of the self.online_user object created in the online extension. The config variable is a copy of the parent core’s configs, and any additional configs needed in this new coreflow. Replace the CoreProExtended/core.py file’s content with the following code

from .mappers.update_address import UpdateAddressMapper
from .queries.update_address_query import UpdateAddressQuery

from q2_cores.CorePro.core import Core as ParentCore


class Core(ParentCore):
    CONFIG_FILE_NAME = "CoreProExtended"
    REQUIRED_CONFIGURATIONS = {
        **ParentCore.REQUIRED_CONFIGURATIONS,
        **{
            "example_required_config": "This key is required."
        }
    }

    async def build_update_address(self, new_address) -> UpdateAddressMapper:
        """
        Other extensions will call this function to make this extension's core call.
        Returns a model representing the contents of the core response
        """
        query = UpdateAddressQuery(self.logger, new_address, self.configured_user.ssn, self.config.API_KEY)
        return UpdateAddressMapper([query], hq_credentials=self.hq_credentials, zone_context=self.core_user)

Using the mock_response method in the query, a model class can be constructed. Replace the CoreProExtended/models/update_address_model.py file’s content with the following code.



class UpdateAddressModel:
    """
    A python object representing the data contained in the core response
    """

    def __init__(self, success: bool, error: int, error_message: str = None):
        # Assign received parameters as class attributes
        self.success = success
        self.error = error
        self.error_message = error_message

Finishing up the coreflow is the mapper. The mapper takes the raw core response and creates a model object that is returned to the extension. Replace the CoreProExtended/mappers/update_address.py file’s content with the following code.

import json
from q2_sdk.models.cores.mappers.base_mapper import BaseMapper
from ..models.update_address_model import UpdateAddressModel
from ..queries.update_address_query import UpdateAddressQuery


class UpdateAddressMapper(BaseMapper):
    """
    Parse a stringified core response into a usable model or Python object
    """

    @staticmethod
    def parse_returned_queries(list_of_queries: list) -> UpdateAddressModel:
        """
        Given a list of core queries to be made, returns a model representing the contents of the core response
        """

        # Sanity checks, change if more than one or more than one type of query is needed
        assert len(list_of_queries) == 1
        assert isinstance(list_of_queries[0], UpdateAddressQuery), 'Query must be an instance of UpdateAddressQuery'

        response = list_of_queries[0].raw_core_response

        # Use lxml to convert the XML response into a nested python object
        root = json.loads(response)
        response_model = UpdateAddressModel(root['Success'], root['Error'], root.get("Error_Message"))

        return response_model

Lastly, focus on the submit function in the extension. This method should now call the new update address core call, check the response for success or error, and render the proper results page. Since everything is mocked in this example, the following code will only expect a success.

Replace the submit method in extension.py to the following

async def submit(self):
    """
    This route will be called when your form is submitted, as configured above.
    """
    new_address = {
        "address_1": self.form_fields['address_1'],
        "address_2": self.form_fields.get("address_2", ''),
        "city": self.form_fields['city'],
        "state": self.form_fields['state'],
        "zip_code": self.form_fields['zip_code']
    }
    update_address_object = await self.core.build_update_address(new_address)
    results = await update_address_object.execute()

    if not results.success:
        self.logger.error(f"{results.error}: {results.error_message}")
    template = self.get_template(
        'submit.html.jinja2',
        {
            'header': "AddressChange",
            'message': 'Successful address change',
            'data': vars(results)
        }
    )

    html = self.get_tecton_form(
        "AddressChange",
        custom_template=template,
        # Hide the submit button as there is no form on this route.
        hide_submit_button=True
    )

    return html

This last replace finishes the extension. This is now a working example of making a core call, and creating a new coreflow to extend an already existing core.

This completes the CoreProExtended coreflow and example extension. More information about cores can be found here: Core Integration