Source code for q2_sdk.core.q2_requests

"""
This module provides an easy interface for asynchronous web requests

The Requests library (https://requests.readthedocs.io/en/master/)
is a beautiful and well known interface for making web requests
in python. With that said, it doesn't play nice with the asynchronous
features of Antilles. This is a thin wrapper module to achieve
the best of both worlds.

To utilize the Requests functionality with an existing ``requests.Session``
object (https://requests.readthedocs.io/en/master/user/advanced/?highlight=session#session-objects)
simply pass the session keyword argument to each HTTP method in this module.
If the "session" argument is omitted from the function call, then it will
use the default, non-session-aware requests method.
"""

import asyncio
import functools
import inspect
import logging
import re
import secrets
from http import HTTPStatus
from timeit import default_timer
from urllib.parse import urlparse

import requests

from q2_sdk.models.oauth_client import OAuthClient
from q2_sdk.core.configuration import settings
from q2_sdk.core.exceptions import BlacklistedUrlError
from q2_sdk.core.prometheus import MetricType, get_metric
from q2_sdk.core.opentelemetry.q2_requests import SpanContext
from q2_sdk.version import __version__


TEST_MODE = settings.TEST_MODE  # True when Q2SDK_TEST_MODE env var is True
OUTBOUND_FAILURE_MSG = (
    "%s not in OUTBOUND_WHITELIST. "
    "Outbound calls are allowed at development time, "
    "but will be blocked in the Q2 datacenter. "
    "Please create a Q2 Support ticket at "
    "https://q2developer.com/support/create?ticketType=Whitelist "
    "to whitelist this url if you are planning "
    "to use it in staging/production, "
    "then add it to the OUTBOUND_WHITELIST variable in your "
    "settings file to suppress this message."
)
TEXT_CONTENT_TYPES = {
    "text/css",
    "text/csv",
    "text/html",
    "text/javascript",
    "text/plain",
    "text/xml",
    "application/json",
    "application/xml",
    "application/javascript",
    "application/soap+xml",
    "application/vnd.kafka.v2+json",
    "application/vnd.kafka.json.v2+json",
}
USER_AGENT = f"python-q2-requests/{__version__}"


[docs] class CallIdLogAdapter(logging.LoggerAdapter):
[docs] def process(self, msg, kwargs): return f"{self.extra['call_id']} {msg}", kwargs
class Q2RequestInterface: def __init__(self, logger, minimal_logging: bool = False): self.dynamic_log_level = logging.getLevelName("DEBUG") info_level_entrypoints = settings.INSTALLED_ENTRYPOINTS + ["run"] if settings.RUNNING_ENTRYPOINT in info_level_entrypoints: self.dynamic_log_level = logging.getLevelName("INFO") self.logger = logger self._minimal_logging = minimal_logging async def call(self, url: str, verb: str, **kwargs): call_id = secrets.token_hex(4) logger = None if self.logger: logger = CallIdLogAdapter(self.logger, {"call_id": call_id}) verb = verb.lower() available_verbs = ("get", "post", "put", "delete", "options", "head", "patch") assert verb in available_verbs, "Verb must be in %s" % str(available_verbs) if logger: params_msg = ( f" Params: {kwargs.get('params')}" if kwargs.get("params") else "" ) logger.log( self.dynamic_log_level, f"Sending HTTP request {verb.upper()} {url}{params_msg}", ) if ( logger.isEnabledFor(logging.getLevelName("DEBUG")) and not self._minimal_logging ): req_params = inspect.signature(requests.Request).parameters.keys() req = requests.Request( method=verb, url=url, **{k: v for k, v in kwargs.items() if k in req_params}, ) req = req.prepare() logger.debug("Request Headers: %s", req.headers) logger.debug("Request Body: %s", req.body) logger.debug("Effective URL: %s", req.url) for file in kwargs.get("files", []): try: getattr(kwargs["files"][file], "seek") kwargs["files"][file].seek(0) except AttributeError: # Is not a seekable file object pass session: requests.Session | None = kwargs.pop("session", None) oauth_client = kwargs.pop("oauth_client", None) headers = session.headers if session else kwargs.get("headers") or {} retry_if_unauth = kwargs.pop("retry_if_unauth", oauth_client is not None) bt_handle = kwargs.pop("bt_handle", None) mock_response = kwargs.pop("mock_response", None) return_success = kwargs.pop("return_success", True) status_code = kwargs.pop("status_code", None) if not kwargs.get("timeout"): kwargs["timeout"] = ( settings.Q2REQUESTS_DEFAULT_CONNECT_TIMEOUT, settings.Q2REQUESTS_DEFAULT_TIMEOUT, ) if "User-Agent" not in headers: headers["User-Agent"] = USER_AGENT if oauth_client: if not isinstance(oauth_client, OAuthClient): raise TypeError("oauth_client must inherit from OAuthClient base class") headers = { **headers, **(await oauth_client.get_token_obj()).serialize_as_header(), } if session: session.headers = headers else: kwargs["headers"] = headers if TEST_MODE: from q2_sdk.tools.testing.models import Q2RequestMock request_mock = Q2RequestMock( verb, mock_response=mock_response, return_success=return_success, status_code=status_code, headers=kwargs.get("headers"), ) func = functools.partial(request_mock.call, url, **kwargs) elif session: func = functools.partial(getattr(session, verb), url, **kwargs) else: func = functools.partial(getattr(requests, verb), url, **kwargs) start_time = default_timer() response = await make_request(func, bt_handle=bt_handle) end_time = default_timer() request_time = end_time - start_time if ( oauth_client is not None and response.status_code in oauth_client.unauthorized_status_codes and retry_if_unauth ): oauth_client.clear_token() kwargs["retry_if_unauth"] = False return await self.call(url, verb, oauth_client=oauth_client, **kwargs) parsed_url = urlparse(url) clean_url = parsed_url.netloc.split("@")[-1] get_metric( MetricType.Histogram, "caliper_http_requests", "Outbound HTTP traffic", {"method": verb, "endpoint": clean_url, "scheme": parsed_url.scheme}, chain={"op": "observe", "params": [request_time]}, ) if logger: if response.status_code < 400: log_method = logger.info elif response.status_code < 500: log_method = logger.warning else: log_method = logger.error try: http_phrase = HTTPStatus(response.status_code).phrase except ValueError: http_phrase = "" msg = ( f"Received HTTP response after {1000 * request_time:.2f}ms - " f"{response.status_code} {http_phrase}" ) logger.log(self.dynamic_log_level, msg) if not response.ok: log_method("Response URL: %s", url) log_method("Response Headers: %s", response.headers) log_method("Response Body: %s", response.content) elif not self._minimal_logging: content_type = response.headers.get("content-type") if not content_type: logger.debug( "Response did not have a content-type header, will not log content" ) elif any([ valid_type in content_type for valid_type in TEXT_CONTENT_TYPES ]): logger.debug("Response Headers: %s", response.headers) logger.debug("Response Body: %s", response.content) else: logger.debug( 'Response of type "%s" not logging content', content_type ) return response def verify_whitelist(function): @functools.wraps(function) async def wrapper(*args, **kwargs): verify = kwargs.pop("verify_whitelist", True) if settings.DEBUG and verify: bound_args = inspect.signature(function).bind(*args, **kwargs).arguments logger = bound_args["logger"] url = bound_args["url"] base_url = urlparse(url).netloc try: _check_whitelist(base_url) except BlacklistedUrlError as err: error_msg = OUTBOUND_FAILURE_MSG % urlparse(url).netloc if logger: logger.error(error_msg) raise BlacklistedUrlError(error_msg) from err response = await function(*args, **kwargs) return response def _check_whitelist(url): """ The purpose of this is not security, but rather to give an early warning of urls that will need to be whitelisted through Q2's networking layer """ allowed = False for pattern in settings.OUTBOUND_WHITELIST: pattern = re.sub(r"https?://", "", pattern) if re.search(pattern, url): allowed = True break if not allowed: raise BlacklistedUrlError return wrapper async def make_request(func: functools.partial, bt_handle=None): response = None with SpanContext(func) as ctx: response = await asyncio.get_event_loop().run_in_executor(None, func) ctx.record_response(response) return response
[docs] @verify_whitelist async def get( logger, url, params=None, session=None, minimal_logging: bool = False, **kwargs ) -> requests.Response: r"""Sends a GET request. :param logger: Reference to calling request's logger (self.logger in your extension) :param url: URL for the new :class:`Request` object. :param params: (optional) Dictionary or bytes to be sent in the query string for the :class:`Request`. :param session: (optional) :class:`requests.Session` object to base the request off of. :param minimal_logging: (optional) flag to turn off debug logging of request header/body/etc. :param \*\*kwargs: Optional arguments that ``request`` takes. """ kwargs["params"] = params kwargs["session"] = session request_obj = Q2RequestInterface(logger, minimal_logging) return await request_obj.call(url, "get", **kwargs)
[docs] @verify_whitelist async def options(logger, url, session=None, minimal_logging: bool = False, **kwargs): r"""Sends an OPTIONS request. :param logger: Reference to calling request's logger (self.logger in your extension) :param url: URL for the new :class:`Request` object. :param session: (optional) :class:`requests.Session` object to base the request off of. :param minimal_logging: (optional) flag to turn off debug logging of request header/body/etc. :param \*\*kwargs: Optional arguments that ``request`` takes. """ kwargs["session"] = session request_obj = Q2RequestInterface(logger, minimal_logging) return await request_obj.call(url, "options", **kwargs)
[docs] @verify_whitelist async def post( logger, url, data=None, json=None, session=None, minimal_logging: bool = False, **kwargs, ) -> requests.Response: r"""Sends a POST request. :param logger: Reference to calling request's logger (self.logger in your extension) :param url: URL for the new :class:`Request` object. :param data: (optional) Dictionary (will be form-encoded), bytes, or file-like object to send in the body of the :class:`Request`. :param json: (optional) json data to send in the body of the :class:`Request`. :param session: (optional) :class:`requests.Session` object to base the request off of. :param minimal_logging: (optional) flag to turn off debug logging of request header/body/etc. :param \*\*kwargs: Optional arguments that ``request`` takes. """ kwargs["session"] = session kwargs["json"] = json kwargs["data"] = data request_obj = Q2RequestInterface(logger, minimal_logging) return await request_obj.call(url, "post", **kwargs)
[docs] @verify_whitelist async def put( logger, url, data=None, session=None, minimal_logging: bool = False, **kwargs ) -> requests.Response: r"""Sends a PUT request. :param logger: Reference to calling request's logger (self.logger in your extension) :param url: URL for the new :class:`Request` object. :param data: (optional) Dictionary (will be form-encoded), bytes, or file-like object to send in the body of the :class:`Request`. :param json: (optional) json data to send in the body of the :class:`Request`. :param session: (optional) :class:`requests.Session` object to base the request off of. :param minimal_logging: (optional) flag to turn off debug logging of request header/body/etc. :param \*\*kwargs: Optional arguments that ``request`` takes. """ kwargs["session"] = session kwargs["data"] = data request_obj = Q2RequestInterface(logger, minimal_logging) return await request_obj.call(url, "put", **kwargs)
[docs] @verify_whitelist async def patch( logger, url, data=None, session=None, minimal_logging: bool = False, **kwargs ) -> requests.Response: r"""Sends a PATCH request. :param logger: Reference to calling request's logger (self.logger in your extension) :param url: URL for the new :class:`Request` object. :param data: (optional) Dictionary (will be form-encoded), bytes, or file-like object to send in the body of the :class:`Request`. :param json: (optional) json data to send in the body of the :class:`Request`. :param session: (optional) :class:`requests.Session` object to base the request off of. :param minimal_logging: (optional) flag to turn off debug logging of request header/body/etc. :param \*\*kwargs: Optional arguments that ``request`` takes. """ kwargs["session"] = session kwargs["data"] = data request_obj = Q2RequestInterface(logger, minimal_logging) return await request_obj.call(url, "patch", **kwargs)
[docs] @verify_whitelist async def delete( logger, url, session=None, minimal_logging: bool = False, **kwargs ) -> requests.Response: r"""Sends a DELETE request. :param logger: Reference to calling request's logger (self.logger in your extension) :param url: URL for the new :class:`Request` object. :param session: (optional) :class:`requests.Session` object to base the request off of. :param minimal_logging: (optional) flag to turn off debug logging of request header/body/etc. :param \*\*kwargs: Optional arguments that ``request`` takes. """ kwargs["session"] = session request_obj = Q2RequestInterface(logger, minimal_logging) return await request_obj.call(url, "delete", **kwargs)