Here it is :
import com.codahale.metrics.SlidingWindowReservoir; import in.dailyhunt.cis.enrichments.datatype.BasicInfoTuple; import in.dailyhunt.cis.enrichments.datatype.SinkTuple; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper; import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Meter; import org.apache.flink.util.Collector; public class DBFlatMap extends RichFlatMapFunction<BasicInfoTuple, SinkTuple> { private transient Meter meter; private transient org.apache.flink.metrics.Histogram histogram; @Override public void open(Configuration parameters) throws Exception { /* some app specific code */ com.codahale.metrics.Meter meter1 = new com.codahale.metrics.Meter(); this.meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new DropwizardMeterWrapper(meter1)); com.codahale.metrics.Histogram histogram1 = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)); this.histogram = getRuntimeContext() .getMetricGroup() .histogram("myHistogram", new DropwizardHistogramWrapper(histogram1)); } @Override public void flatMap(BasicInfoTuple in, Collector<SinkTuple> out) throws Exception { long start = System.currentTimeMillis(); incrementCounter("input-in-group-and-KeyElect-Flow", this.getRuntimeContext()); this.dbOperations(); addMetricData(start, System.currentTimeMillis()); this.meter.markEvent(); } private void dbOperations() { // Db Operation and some app logic } public void incrementCounter(String counterName, RuntimeContext runtimeContext) { if (runtimeContext == null) { return; } LongCounter lc = runtimeContext.getLongCounter(counterName); if (lc == null) { lc = new LongCounter(); runtimeContext.addAccumulator(counterName, lc); } lc.add(1L); } private void addMetricData(long startTime, long endTime) { final long opTimeInSec = (endTime - startTime) / 1000; this.histogram.update(opTimeInSec); getRuntimeContext().getMetricGroup() .gauge("DbOpGauge", (Gauge<String>) () -> String.valueOf(opTimeInSec)); } } -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Gauge-Metric-is-not-getting-updated-on-Job-Dashboard-tp13842p13888.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.