Source code for q2_sdk.core.prometheus

"""
Prometheus is a monitoring solution we use at Q2 for keeping track
of the stats that matter most to a running service.

https://prometheus.io/

There are a few gotchas in working with Prometheus that we've attempted
to abstract away with this module. To use it will be something like
this:


.. code-block:: python

    prometheus.get_metric(
        MetricType.Counter,
        'caliper_endpoints',
        'Number of times called',
        labels={'endpoint': self.extension_name}
    ).inc()

"""

from enum import Enum as PythonEnum
from typing import Optional

from prometheus_client import Counter, Histogram, Info, Summary, Gauge, Enum, Metric
from prometheus_client.metrics import MetricWrapperBase
from prometheus_client.registry import REGISTRY

from q2_sdk.core.configuration import settings
from q2_sdk.core.non_http_handlers.udp import writer as udp_writer
from q2_sdk.version import __version__
from q2_sdk.models.version import Version

EXISTING_METRICS = {}


[docs] class MetricType(PythonEnum): """Available metric types according to Prometheus""" Counter = Counter Histogram = Histogram Info = Info Summary = Summary Gauge = Gauge Enum = Enum Metric = Metric
[docs] def get_metric( metric_type: MetricType, name: str, documentation: str, labels: dict, namespace="", subsystem="", unit="", registry=REGISTRY, chain: Optional[dict] = None, is_parent=False, **kwargs, ) -> MetricWrapperBase: """ Will return an instance of metric_type with given parameters as inputs, but will return an existing one if it already exists, creating a new one only if it's the first time that metric combination has been seen. Any key/value pairs in kwargs will be passed through to the underlying MetricType in prometheus_client.metrics (such as buckets for Histogram) """ if chain is None: chain = {} if isinstance(metric_type, str): metric_type = MetricType[metric_type] if settings.TEST_MODE: from q2_sdk.tools.testing.models import MetricMock return MetricMock() if settings.FORK_REQUESTS and not is_parent: msg = { "metric_type": metric_type.name, "name": name, "documentation": documentation, "labels": labels, "namespace": namespace, "subsystem": subsystem, "unit": unit, "chain": chain, **kwargs, } udp_writer.send_msg(udp_writer.MsgType.Metrics, msg) return service_name = settings.SERVICE_NAME or settings.HOSTNAME labels.update({"caliper_service": service_name}) EXISTING_METRICS.setdefault(metric_type.name, {name: None}) these_metrics = EXISTING_METRICS[metric_type.name] if not these_metrics.get(name): these_metrics[name] = metric_type.value( name, documentation, labelnames=tuple(labels.keys()), namespace=namespace, subsystem=subsystem, unit=unit, registry=registry, **kwargs, ) metric = these_metrics[name].labels(**labels) chain_op = chain.get("op") if chain_op == "inc": metric.inc() elif chain_op == "dec": metric.dec() elif chain_op == "observe": metric.observe(*chain["params"]) return metric
VERSION = Version(__version__) Info( "caliper", "Information about the running SDK server", labelnames=("version", "major", "minor", "patch", "fork_mode"), ).labels( VERSION.version_string, VERSION.major, VERSION.minor, VERSION.patch, settings.FORK_REQUESTS, )