Hi,

I'm interested in this question.
Do you have any experience to share?

I'm currently using aioprometheus, but the metric exposure is blocking:
https://github.com/claws/aioprometheus/issues/98

Looking at the official client, it seems that they added async ASGI metric 
exposure. But I don't know if their metrics update will then be blocking.
I didn't try it yet.
See https://github.com/prometheus/client_python/pull/512 

I discover this package but didn't try it yet.
https://github.com/Llandy3d/pytheus/issues/55

And there is also this prometheus-async, I asked a question there too:
https://github.com/hynek/prometheus-async/issues/59


On Sunday 11 October 2020 at 23:14:34 UTC+1 Alexander wrote:

> Is there any way to make custom collector work withing aiohttp application 
> (or any other way to convert json object to prometheus metrics in aiohttp 
> application)?
>
> I'm trying to implement custom collector according to 
> https://github.com/prometheus/client_python#custom-collectors and 
> fetching data via *`await session.get(...)`*. The only solution I found 
> is to change *`def collect()`* method to *`async def collect()`* in 
> custom collector class and override registry methods with sync loops to 
> async one and also re-implement *`generate_latest`* function.
>
> *the overrides:*
> ```
> class CustomRegistry(CollectorRegistry):
>     async def collect(self):
>         """Yields metrics from the collectors in the registry."""
>         collectors = None
>         ti = None
>         with self._lock:
>             collectors = copy.copy(self._collector_to_names)
>             if self._target_info:
>                 ti = self._target_info_metric()
>         if ti:
>             yield ti
>         for collector in collectors:
>             async for metric in collector.collect():
>                 yield metric
>
>     async def register(self, collector):
>         """Add a collector to the registry."""
>         with self._lock:
>             names = await self._get_names(collector)
>             duplicates = set(self._names_to_collectors).intersection(names)
>             if duplicates:
>                 raise ValueError(
>                     'Duplicated timeseries in CollectorRegistry: 
> {0}'.format(
>                         duplicates))
>             for name in names:
>                 self._names_to_collectors[name] = collector
>             self._collector_to_names[collector] = names
>
>     async def _get_names(self, collector):
>         """Get names of timeseries the collector produces."""
>         desc_func = None
>         # If there's a describe function, use it.
>         try:
>             desc_func = collector.describe
>         except AttributeError:
>             pass
>         # Otherwise, if auto describe is enabled use the collect function.
>         if not desc_func and self._auto_describe:
>             desc_func = collector.collect
>
>         if not desc_func:
>             return []
>
>         result = []
>         type_suffixes = {
>             'counter': ['_total', '_created'],
>             'summary': ['', '_sum', '_count', '_created'],
>             'histogram': ['_bucket', '_sum', '_count', '_created'],
>             'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
>             'info': ['_info'],
>         }
>         async for metric in desc_func():
>             for suffix in type_suffixes.get(metric.type, ['']):
>                 result.append(metric.name + suffix)
>         return result
>
>
> def sample_line(line):
>     if line.labels:
>         labelstr = '{{{0}}}'.format(','.join(
>             ['{0}="{1}"'.format(
>                 k, v.replace('\\', r'\\').replace('\n', 
> r'\n').replace('"', r'\"'))
>                 for k, v in sorted(line.labels.items())]))
>     else:
>         labelstr = ''
>     timestamp = ''
>     if line.timestamp is not None:
>         # Convert to milliseconds.
>         timestamp = ' {0:d}'.format(int(float(line.timestamp) * 1000))
>     return '{0}{1} {2}{3}\n'.format(
>         line.name, labelstr, floatToGoString(line.value), timestamp)
>
>
> async def generate_latest(registry):
>     """Returns the metrics from the registry in latest text format as a 
> string."""
>     output = []
>     async for metric in registry.collect():
>         try:
>             mname = metric.name
>             mtype = metric.type
>             # Munging from OpenMetrics into Prometheus format.
>             if mtype == 'counter':
>                 mname = mname + '_total'
>             elif mtype == 'info':
>                 mname = mname + '_info'
>                 mtype = 'gauge'
>             elif mtype == 'stateset':
>                 mtype = 'gauge'
>             elif mtype == 'gaugehistogram':
>                 # A gauge histogram is really a gauge,
>                 # but this captures the structure better.
>                 mtype = 'histogram'
>             elif mtype == 'unknown':
>                 mtype = 'untyped'
>
>             output.append('# HELP {0} {1}\n'.format(
>                 mname, metric.documentation.replace('\\', 
> r'\\').replace('\n', r'\n')))
>             output.append('# TYPE {0} {1}\n'.format(mname, mtype))
>
>             om_samples = {}
>             for s in metric.samples:
>                 for suffix in ['_created', '_gsum', '_gcount']:
>                     if s.name == metric.name + suffix:
>                         # OpenMetrics specific sample, put in a gauge at 
> the end.
>                         om_samples.setdefault(suffix, 
> []).append(sample_line(s))
>                         break
>                 else:
>                     output.append(sample_line(s))
>         except Exception as exception:
>             exception.args = (exception.args or ('',)) + (metric,)
>             raise
>
>         for suffix, lines in sorted(om_samples.items()):
>             output.append('# HELP {0}{1} {2}\n'.format(metric.name, 
> suffix, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
>             output.append('# TYPE {0}{1} gauge\n'.format(metric.name, 
> suffix))
>             output.extend(lines)
>     return ''.join(output).encode('utf-8')
> ```
>
> *the custom collector class:*
> ```
> class CustomCollector(object):
>     def __init__(self, app):
>         self.session = app['session']
>         self.metrics_url = app['metrics_url']
>
>     async def collect(self):
>         resp = await self.session.get(self.metrics_url)
>         payload = await resp.json()
>         for name, v in payload['metrics'].items():
>             name = name.replace('.', '_')
>             name = name.replace('-', '_')
>             name = re.sub('(?!^)([A-Z]+)', r'_\1', name).lower()
>             kind = v['kind']
>             if kind == 'Gauge':
>                 m = GaugeMetricFamily(name, name, labels=[])
>             if kind == 'Timer':
>                 m = SummaryMetricFamily(name, name, labels=[])
>             if kind == 'Counter':
>                 m = CounterMetricFamily(name, name, labels=[])
>             for i in v['values']:
>                 tags = i['tags']
>                 labels = {tag['key']: tag['value'] for tag in tags}
>                 value = i['values'][0]['v']
>                 timestamp = i['values'][0]['t']
>                 m.add_sample(name, labels, value, timestamp=timestamp)
>             yield m
> ```

-- 
You received this message because you are subscribed to the Google Groups 
"Prometheus Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to prometheus-users+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/prometheus-users/2c36fc4e-6b53-4ff0-a147-006d4423f8a2n%40googlegroups.com.

Reply via email to