"""
Prometheus is a monitoring solution we use at Q2 for keeping track
of the stats that matter most to a running service.
https://prometheus.io/
There are a few gotchas in working with Prometheus that we've attempted
to abstract away with this module. To use it will be something like
this:
.. code-block:: python
prometheus.get_metric(
MetricType.Counter,
'caliper_endpoints',
'Number of times called',
labels={'endpoint': self.extension_name}
).inc()
"""
from enum import Enum as PythonEnum
from typing import Optional
from prometheus_client import Counter, Histogram, Info, Summary, Gauge, Enum, Metric
from prometheus_client.metrics import MetricWrapperBase
from prometheus_client.registry import REGISTRY
from q2_sdk.core.configuration import settings
from q2_sdk.core.non_http_handlers.udp import writer as udp_writer
from q2_sdk.version import __version__
from q2_sdk.models.version import Version
EXISTING_METRICS = {}
[docs]
class MetricType(PythonEnum):
"""Available metric types according to Prometheus"""
Counter = Counter
Histogram = Histogram
Info = Info
Summary = Summary
Gauge = Gauge
Enum = Enum
Metric = Metric
[docs]
def get_metric(
metric_type: MetricType,
name: str,
documentation: str,
labels: dict,
namespace="",
subsystem="",
unit="",
registry=REGISTRY,
chain: Optional[dict] = None,
is_parent=False,
**kwargs,
) -> MetricWrapperBase:
"""
Will return an instance of metric_type with given parameters
as inputs, but will return an existing one if it already exists,
creating a new one only if it's the first time that metric
combination has been seen.
Any key/value pairs in kwargs will be passed through to the underlying
MetricType in prometheus_client.metrics (such as buckets for Histogram)
"""
if chain is None:
chain = {}
if isinstance(metric_type, str):
metric_type = MetricType[metric_type]
if settings.TEST_MODE:
from q2_sdk.tools.testing.models import MetricMock
return MetricMock()
if settings.FORK_REQUESTS and not is_parent:
msg = {
"metric_type": metric_type.name,
"name": name,
"documentation": documentation,
"labels": labels,
"namespace": namespace,
"subsystem": subsystem,
"unit": unit,
"chain": chain,
**kwargs,
}
udp_writer.send_msg(udp_writer.MsgType.Metrics, msg)
return
service_name = settings.SERVICE_NAME or settings.HOSTNAME
labels.update({"caliper_service": service_name})
EXISTING_METRICS.setdefault(metric_type.name, {name: None})
these_metrics = EXISTING_METRICS[metric_type.name]
if not these_metrics.get(name):
these_metrics[name] = metric_type.value(
name,
documentation,
labelnames=tuple(labels.keys()),
namespace=namespace,
subsystem=subsystem,
unit=unit,
registry=registry,
**kwargs,
)
metric = these_metrics[name].labels(**labels)
chain_op = chain.get("op")
if chain_op == "inc":
metric.inc()
elif chain_op == "dec":
metric.dec()
elif chain_op == "observe":
metric.observe(*chain["params"])
return metric
VERSION = Version(__version__)
Info(
"caliper",
"Information about the running SDK server",
labelnames=("version", "major", "minor", "patch", "fork_mode"),
).labels(
VERSION.version_string,
VERSION.major,
VERSION.minor,
VERSION.patch,
settings.FORK_REQUESTS,
)