Is there any way to make custom collector work withing aiohttp application 
(or any other way to convert json object to prometheus metrics in aiohttp 
application)?

I'm trying to implement custom collector according to 
https://github.com/prometheus/client_python#custom-collectors and fetching 
data via *`await session.get(...)`*. The only solution I found is to change 
*`def 
collect()`* method to *`async def collect()`* in custom collector class and 
override registry methods with sync loops to async one and also 
re-implement *`generate_latest`* function.

*the overrides:*
```
class CustomRegistry(CollectorRegistry):
    async def collect(self):
        """Yields metrics from the collectors in the registry."""
        collectors = None
        ti = None
        with self._lock:
            collectors = copy.copy(self._collector_to_names)
            if self._target_info:
                ti = self._target_info_metric()
        if ti:
            yield ti
        for collector in collectors:
            async for metric in collector.collect():
                yield metric

    async def register(self, collector):
        """Add a collector to the registry."""
        with self._lock:
            names = await self._get_names(collector)
            duplicates = set(self._names_to_collectors).intersection(names)
            if duplicates:
                raise ValueError(
                    'Duplicated timeseries in CollectorRegistry: 
{0}'.format(
                        duplicates))
            for name in names:
                self._names_to_collectors[name] = collector
            self._collector_to_names[collector] = names

    async def _get_names(self, collector):
        """Get names of timeseries the collector produces."""
        desc_func = None
        # If there's a describe function, use it.
        try:
            desc_func = collector.describe
        except AttributeError:
            pass
        # Otherwise, if auto describe is enabled use the collect function.
        if not desc_func and self._auto_describe:
            desc_func = collector.collect

        if not desc_func:
            return []

        result = []
        type_suffixes = {
            'counter': ['_total', '_created'],
            'summary': ['', '_sum', '_count', '_created'],
            'histogram': ['_bucket', '_sum', '_count', '_created'],
            'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
            'info': ['_info'],
        }
        async for metric in desc_func():
            for suffix in type_suffixes.get(metric.type, ['']):
                result.append(metric.name + suffix)
        return result


def sample_line(line):
    if line.labels:
        labelstr = '{{{0}}}'.format(','.join(
            ['{0}="{1}"'.format(
                k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', 
r'\"'))
                for k, v in sorted(line.labels.items())]))
    else:
        labelstr = ''
    timestamp = ''
    if line.timestamp is not None:
        # Convert to milliseconds.
        timestamp = ' {0:d}'.format(int(float(line.timestamp) * 1000))
    return '{0}{1} {2}{3}\n'.format(
        line.name, labelstr, floatToGoString(line.value), timestamp)


async def generate_latest(registry):
    """Returns the metrics from the registry in latest text format as a 
string."""
    output = []
    async for metric in registry.collect():
        try:
            mname = metric.name
            mtype = metric.type
            # Munging from OpenMetrics into Prometheus format.
            if mtype == 'counter':
                mname = mname + '_total'
            elif mtype == 'info':
                mname = mname + '_info'
                mtype = 'gauge'
            elif mtype == 'stateset':
                mtype = 'gauge'
            elif mtype == 'gaugehistogram':
                # A gauge histogram is really a gauge,
                # but this captures the structure better.
                mtype = 'histogram'
            elif mtype == 'unknown':
                mtype = 'untyped'

            output.append('# HELP {0} {1}\n'.format(
                mname, metric.documentation.replace('\\', 
r'\\').replace('\n', r'\n')))
            output.append('# TYPE {0} {1}\n'.format(mname, mtype))

            om_samples = {}
            for s in metric.samples:
                for suffix in ['_created', '_gsum', '_gcount']:
                    if s.name == metric.name + suffix:
                        # OpenMetrics specific sample, put in a gauge at 
the end.
                        om_samples.setdefault(suffix, 
[]).append(sample_line(s))
                        break
                else:
                    output.append(sample_line(s))
        except Exception as exception:
            exception.args = (exception.args or ('',)) + (metric,)
            raise

        for suffix, lines in sorted(om_samples.items()):
            output.append('# HELP {0}{1} {2}\n'.format(metric.name, suffix, 
metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
            output.append('# TYPE {0}{1} gauge\n'.format(metric.name, 
suffix))
            output.extend(lines)
    return ''.join(output).encode('utf-8')
```

*the custom collector class:*
```
class CustomCollector(object):
    def __init__(self, app):
        self.session = app['session']
        self.metrics_url = app['metrics_url']

    async def collect(self):
        resp = await self.session.get(self.metrics_url)
        payload = await resp.json()
        for name, v in payload['metrics'].items():
            name = name.replace('.', '_')
            name = name.replace('-', '_')
            name = re.sub('(?!^)([A-Z]+)', r'_\1', name).lower()
            kind = v['kind']
            if kind == 'Gauge':
                m = GaugeMetricFamily(name, name, labels=[])
            if kind == 'Timer':
                m = SummaryMetricFamily(name, name, labels=[])
            if kind == 'Counter':
                m = CounterMetricFamily(name, name, labels=[])
            for i in v['values']:
                tags = i['tags']
                labels = {tag['key']: tag['value'] for tag in tags}
                value = i['values'][0]['v']
                timestamp = i['values'][0]['t']
                m.add_sample(name, labels, value, timestamp=timestamp)
            yield m
```

-- 
You received this message because you are subscribed to the Google Groups 
"Prometheus Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/prometheus-users/86f0e86e-a3c5-473a-bbeb-defa80bc817bn%40googlegroups.com.

Reply via email to