Source code for q2_sdk.core.q2_requests

"""
This module provides an easy interface for asynchronous web requests

The Requests library (https://requests.readthedocs.io/en/master/)
is a beautiful and well known interface for making web requests
in python. With that said, it doesn't play nice with the asynchronous
features of Antilles. This is a thin wrapper module to achieve
the best of both worlds.

To utilize the Requests functionality with an existing ``requests.Session``
object (https://requests.readthedocs.io/en/master/user/advanced/?highlight=session#session-objects)
simply pass the session keyword argument to each HTTP method in this module.
If the "session" argument is omitted from the function call, then it will
use the default, non-session-aware requests method.
"""

import asyncio
import functools
import inspect
import logging
import re
import secrets
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from http import HTTPStatus
from timeit import default_timer
from typing import Optional
from urllib.parse import urlparse

import requests

from q2_sdk.models.oauth_client import OAuthClient
from q2_sdk.core.configuration import settings
from q2_sdk.core.exceptions import BlacklistedUrlError
from q2_sdk.core.prometheus import MetricType, get_metric
from q2_sdk.core.opentelemetry.q2_requests import SpanContext
from q2_sdk.version import __version__


TEST_MODE = settings.TEST_MODE  # True when Q2SDK_TEST_MODE env var is True
OUTBOUND_FAILURE_MSG = (
    "%s not in OUTBOUND_WHITELIST. "
    "Outbound calls are allowed at development time, "
    "but will be blocked in the Q2 datacenter. "
    "Please create a Q2 Support ticket at "
    "https://q2developer.com/support/create?ticketType=Whitelist "
    "to whitelist this url if you are planning "
    "to use it in staging/production, "
    "then add it to the OUTBOUND_WHITELIST variable in your "
    "settings file to suppress this message."
)
TEXT_CONTENT_TYPES = {
    "text/css",
    "text/csv",
    "text/html",
    "text/javascript",
    "text/plain",
    "text/xml",
    "application/json",
    "application/xml",
    "application/javascript",
    "application/soap+xml",
    "application/vnd.kafka.v2+json",
    "application/vnd.kafka.json.v2+json",
}
USER_AGENT = f"python-q2-requests/{__version__}"


[docs] @dataclass class SessionEntry: session: requests.Session expires_at: datetime @property def is_expired(self) -> bool: return datetime.now() > self.expires_at
[docs] @dataclass class SessionPool: """Maintains a pool of reusable requests.Session objects keyed by origin (scheme+host). Reusing sessions avoids rebuilding TCP connections on repeated calls to the same host. Sessions are evicted when idle for longer than idle_timeout seconds, or when the pool reaches max_size and a new origin is requested (least recently used is evicted first). """ max_size: int = settings.MAX_SESSION_POOL_SIZE idle_timeout: float = settings.SESSION_POOL_TIMEOUT sessions: dict[str, SessionEntry] = field(default_factory=dict) def expire_idle_sessions(self) -> None: self.sessions = { origin: entry for origin, entry in self.sessions.items() if not entry.is_expired } def get_active(self) -> dict[str, SessionEntry]: self.expire_idle_sessions() return self.sessions def get_session( self, url: str, key_override: Optional[str] = None ) -> requests.Session: parsed = urlparse(url) if key_override is not None: origin = key_override else: origin = f"{parsed.scheme}://{parsed.netloc}" self.expire_idle_sessions() if origin in self.sessions: entry = self.sessions[origin] entry.expires_at = datetime.now() + timedelta(seconds=self.idle_timeout) return entry.session if len(self.sessions) >= self.max_size: oldest = min( self.sessions, key=lambda origin: self.sessions[origin].expires_at ) del self.sessions[oldest] entry = SessionEntry( session=requests.Session(), expires_at=datetime.now() + timedelta(seconds=self.idle_timeout), ) self.sessions[origin] = entry return entry.session
SESSION_POOL = SessionPool()
[docs] class CallIdLogAdapter(logging.LoggerAdapter):
[docs] def process(self, msg, kwargs): return f"{self.extra['call_id']} {msg}", kwargs
class Q2RequestInterface: def __init__(self, logger, minimal_logging: bool = False): self.dynamic_log_level = logging.getLevelName("DEBUG") info_level_entrypoints = settings.INSTALLED_ENTRYPOINTS + ["run"] if settings.RUNNING_ENTRYPOINT in info_level_entrypoints: self.dynamic_log_level = logging.getLevelName("INFO") self.logger = logger self._minimal_logging = minimal_logging async def call(self, url: str, verb: str, **kwargs): call_id = secrets.token_hex(4) logger = None if self.logger: logger = CallIdLogAdapter(self.logger, {"call_id": call_id}) verb = verb.lower() available_verbs = ("get", "post", "put", "delete", "options", "head", "patch") assert verb in available_verbs, "Verb must be in %s" % str(available_verbs) if logger: params_msg = ( f" Params: {kwargs.get('params')}" if kwargs.get("params") else "" ) logger.log( self.dynamic_log_level, f"Sending HTTP request {verb.upper()} {url}{params_msg}", ) if ( logger.isEnabledFor(logging.getLevelName("DEBUG")) and not self._minimal_logging ): req_params = inspect.signature(requests.Request).parameters.keys() req = requests.Request( method=verb, url=url, **{k: v for k, v in kwargs.items() if k in req_params}, ) req = req.prepare() logger.debug("Request Headers: %s", req.headers) logger.debug("Request Body: %s", req.body) logger.debug("Effective URL: %s", req.url) for file in kwargs.get("files", []): try: getattr(kwargs["files"][file], "seek") kwargs["files"][file].seek(0) except AttributeError: # Is not a seekable file object pass session: requests.Session | None = kwargs.pop("session", None) use_session_pool = kwargs.pop("use_session_pool", True) if session is None and use_session_pool: session = SESSION_POOL.get_session( url, kwargs.pop("session_key_override", None) ) oauth_client = kwargs.pop("oauth_client", None) headers = session.headers if session else kwargs.get("headers") or {} retry_if_unauth = kwargs.pop("retry_if_unauth", oauth_client is not None) mock_response = kwargs.pop("mock_response", None) return_success = kwargs.pop("return_success", True) status_code = kwargs.pop("status_code", None) report_otel_errors = kwargs.pop("report_otel_errors", True) if not kwargs.get("timeout"): kwargs["timeout"] = ( settings.Q2REQUESTS_DEFAULT_CONNECT_TIMEOUT, settings.Q2REQUESTS_DEFAULT_TIMEOUT, ) if "User-Agent" not in headers: headers["User-Agent"] = USER_AGENT if oauth_client: if not isinstance(oauth_client, OAuthClient): raise TypeError("oauth_client must inherit from OAuthClient base class") headers = { **headers, **(await oauth_client.get_token_obj()).serialize_as_header(), } if session: session.headers = headers else: kwargs["headers"] = headers if TEST_MODE: from q2_sdk.tools.testing.models import Q2RequestMock request_mock = Q2RequestMock( verb, mock_response=mock_response, return_success=return_success, status_code=status_code, headers=kwargs.get("headers"), ) func = functools.partial(request_mock.call, url, **kwargs) elif session: func = functools.partial(getattr(session, verb), url, **kwargs) else: func = functools.partial(getattr(requests, verb), url, **kwargs) start_time = default_timer() response = await make_request(func, report_otel_errors=report_otel_errors) end_time = default_timer() request_time = end_time - start_time if ( oauth_client is not None and response.status_code in oauth_client.unauthorized_status_codes and retry_if_unauth ): oauth_client.clear_token() kwargs["retry_if_unauth"] = False kwargs["report_otel_errors"] = report_otel_errors return await self.call(url, verb, oauth_client=oauth_client, **kwargs) parsed_url = urlparse(url) clean_url = parsed_url.netloc.split("@")[-1] get_metric( MetricType.Histogram, "caliper_http_requests", "Outbound HTTP traffic", {"method": verb, "endpoint": clean_url, "scheme": parsed_url.scheme}, chain={"op": "observe", "params": [request_time]}, ) if logger: if response.status_code < 400: log_method = logger.info elif response.status_code < 500: log_method = logger.warning else: log_method = logger.error try: http_phrase = HTTPStatus(response.status_code).phrase except ValueError: http_phrase = "" msg = ( f"Received HTTP response after {1000 * request_time:.2f}ms - " f"{response.status_code} {http_phrase}" ) logger.log(self.dynamic_log_level, msg) if not response.ok: log_method("Response URL: %s", url) log_method("Response Headers: %s", response.headers) log_method("Response Body: %s", response.content) elif not self._minimal_logging: content_type = response.headers.get("content-type") if not content_type: logger.debug( "Response did not have a content-type header, will not log content" ) elif any([ valid_type in content_type for valid_type in TEXT_CONTENT_TYPES ]): logger.debug("Response Headers: %s", response.headers) logger.debug("Response Body: %s", response.content) else: logger.debug( 'Response of type "%s" not logging content', content_type ) return response def verify_whitelist(function): @functools.wraps(function) async def wrapper(*args, **kwargs): verify = kwargs.pop("verify_whitelist", True) if settings.DEBUG and verify: bound_args = inspect.signature(function).bind(*args, **kwargs).arguments logger = bound_args["logger"] url = bound_args["url"] base_url = urlparse(url).netloc try: _check_whitelist(base_url) except BlacklistedUrlError as err: error_msg = OUTBOUND_FAILURE_MSG % urlparse(url).netloc if logger: logger.error(error_msg) raise BlacklistedUrlError(error_msg) from err response = await function(*args, **kwargs) return response def _check_whitelist(url): """ The purpose of this is not security, but rather to give an early warning of urls that will need to be whitelisted through Q2's networking layer """ allowed = False for pattern in settings.OUTBOUND_WHITELIST: pattern = re.sub(r"https?://", "", pattern) if re.search(pattern, url): allowed = True break if not allowed: raise BlacklistedUrlError return wrapper async def make_request(func: functools.partial, report_otel_errors: bool = True): response = None with SpanContext(func, report_otel_errors=report_otel_errors) as ctx: response = await asyncio.get_event_loop().run_in_executor(None, func) ctx.record_response(response) return response
[docs] @verify_whitelist async def get( logger, url, params=None, session=None, minimal_logging: bool = False, **kwargs, ) -> requests.Response: r"""Sends a GET request. :param logger: Reference to calling request's logger (self.logger in your extension) :param url: URL for the new :class:`Request` object. :param params: (optional) Dictionary or bytes to be sent in the query string for the :class:`Request`. :param session: (optional) :class:`requests.Session` object to base the request off of. :param minimal_logging: (optional) flag to turn off debug logging of request header/body/etc. :param \*\*kwargs: Optional arguments that ``request`` takes. """ kwargs["params"] = params kwargs["session"] = session request_obj = Q2RequestInterface(logger, minimal_logging) return await request_obj.call(url, "get", **kwargs)
[docs] @verify_whitelist async def options( logger, url, session=None, minimal_logging: bool = False, **kwargs, ): r"""Sends an OPTIONS request. :param logger: Reference to calling request's logger (self.logger in your extension) :param url: URL for the new :class:`Request` object. :param session: (optional) :class:`requests.Session` object to base the request off of. :param minimal_logging: (optional) flag to turn off debug logging of request header/body/etc. :param \*\*kwargs: Optional arguments that ``request`` takes. """ kwargs["session"] = session request_obj = Q2RequestInterface(logger, minimal_logging) return await request_obj.call(url, "options", **kwargs)
[docs] @verify_whitelist async def post( logger, url, data=None, json=None, session=None, minimal_logging: bool = False, **kwargs, ) -> requests.Response: r"""Sends a POST request. :param logger: Reference to calling request's logger (self.logger in your extension) :param url: URL for the new :class:`Request` object. :param data: (optional) Dictionary (will be form-encoded), bytes, or file-like object to send in the body of the :class:`Request`. :param json: (optional) json data to send in the body of the :class:`Request`. :param session: (optional) :class:`requests.Session` object to base the request off of. :param minimal_logging: (optional) flag to turn off debug logging of request header/body/etc. :param \*\*kwargs: Optional arguments that ``request`` takes. """ kwargs["session"] = session kwargs["json"] = json kwargs["data"] = data request_obj = Q2RequestInterface(logger, minimal_logging) return await request_obj.call(url, "post", **kwargs)
[docs] @verify_whitelist async def put( logger, url, data=None, session=None, minimal_logging: bool = False, **kwargs, ) -> requests.Response: r"""Sends a PUT request. :param logger: Reference to calling request's logger (self.logger in your extension) :param url: URL for the new :class:`Request` object. :param data: (optional) Dictionary (will be form-encoded), bytes, or file-like object to send in the body of the :class:`Request`. :param json: (optional) json data to send in the body of the :class:`Request`. :param session: (optional) :class:`requests.Session` object to base the request off of. :param minimal_logging: (optional) flag to turn off debug logging of request header/body/etc. :param \*\*kwargs: Optional arguments that ``request`` takes. """ kwargs["session"] = session kwargs["data"] = data request_obj = Q2RequestInterface(logger, minimal_logging) return await request_obj.call(url, "put", **kwargs)
[docs] @verify_whitelist async def patch( logger, url, data=None, session=None, minimal_logging: bool = False, **kwargs, ) -> requests.Response: r"""Sends a PATCH request. :param logger: Reference to calling request's logger (self.logger in your extension) :param url: URL for the new :class:`Request` object. :param data: (optional) Dictionary (will be form-encoded), bytes, or file-like object to send in the body of the :class:`Request`. :param json: (optional) json data to send in the body of the :class:`Request`. :param session: (optional) :class:`requests.Session` object to base the request off of. :param minimal_logging: (optional) flag to turn off debug logging of request header/body/etc. :param \*\*kwargs: Optional arguments that ``request`` takes. """ kwargs["session"] = session kwargs["data"] = data request_obj = Q2RequestInterface(logger, minimal_logging) return await request_obj.call(url, "patch", **kwargs)
[docs] @verify_whitelist async def delete( logger, url, session=None, minimal_logging: bool = False, **kwargs, ) -> requests.Response: r"""Sends a DELETE request. :param logger: Reference to calling request's logger (self.logger in your extension) :param url: URL for the new :class:`Request` object. :param session: (optional) :class:`requests.Session` object to base the request off of. :param minimal_logging: (optional) flag to turn off debug logging of request header/body/etc. :param \*\*kwargs: Optional arguments that ``request`` takes. """ kwargs["session"] = session request_obj = Q2RequestInterface(logger, minimal_logging) return await request_obj.call(url, "delete", **kwargs)