Hi. I would like to have the ability to control the metric group of flink kinesis consumer: As written below it creates metric identifier for each stream name and shard id (in our case more than 1000 metric identifiers), in such matter it cannot be aggregated in data dog graph
private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup, KinesisStreamShardState shardState) { ShardMetricsReporter shardMetrics = new ShardMetricsReporter(); MetricGroup streamShardMetricGroup = metricGroup .addGroup( KinesisConsumerMetricConstants.STREAM_METRICS_GROUP, shardState.getStreamShardHandle().getStreamName()) .addGroup( KinesisConsumerMetricConstants.SHARD_METRICS_GROUP, shardState.getStreamShardHandle().getShard().getShardId()); streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, shardMetrics::getMillisBehindLatest); streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH, shardMetrics::getMaxNumberOfRecordsPerFetch); streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfAggregatedRecords); streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfDeaggregatedRecords); streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES, shardMetrics::getAverageRecordSizeBytes); streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, shardMetrics::getBytesPerRead); streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, shardMetrics::getRunLoopTimeNanos); streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, shardMetrics::getLoopFrequencyHz); streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, shardMetrics::getSleepTimeMillis); return shardMetrics; } Would be happy for your advice. Thanks, Yitzchak.