Here it is :

import com.codahale.metrics.SlidingWindowReservoir;
import in.dailyhunt.cis.enrichments.datatype.BasicInfoTuple;
import in.dailyhunt.cis.enrichments.datatype.SinkTuple;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.util.Collector;


public class DBFlatMap extends RichFlatMapFunction<BasicInfoTuple,
SinkTuple> {

        private transient Meter meter;
        private transient org.apache.flink.metrics.Histogram histogram;


        @Override
        public void open(Configuration parameters) throws Exception {

                /*
                  some app specific code
                 */

                com.codahale.metrics.Meter meter1 = new 
com.codahale.metrics.Meter();
                this.meter = getRuntimeContext()
                                .getMetricGroup()
                                .meter("myMeter", new 
DropwizardMeterWrapper(meter1));

                com.codahale.metrics.Histogram histogram1 =
                                new com.codahale.metrics.Histogram(new 
SlidingWindowReservoir(500));

                this.histogram = getRuntimeContext()
                                .getMetricGroup()
                                .histogram("myHistogram", new 
DropwizardHistogramWrapper(histogram1));

        }

        @Override
        public void flatMap(BasicInfoTuple in, Collector<SinkTuple> out) throws
Exception {
                long start = System.currentTimeMillis();
                incrementCounter("input-in-group-and-KeyElect-Flow",
this.getRuntimeContext());
                this.dbOperations();
                addMetricData(start, System.currentTimeMillis());
                this.meter.markEvent();
        }

        private void dbOperations() {
                // Db Operation and some app logic
        }

        public void incrementCounter(String counterName, RuntimeContext
runtimeContext) {
                if (runtimeContext == null) {
                        return;
                }

                LongCounter lc = runtimeContext.getLongCounter(counterName);
                if (lc == null) {
                        lc = new LongCounter();
                        runtimeContext.addAccumulator(counterName, lc);
                }
                lc.add(1L);
        }


        private void addMetricData(long startTime, long endTime) {
                final long opTimeInSec = (endTime - startTime) / 1000;
                this.histogram.update(opTimeInSec);
                getRuntimeContext().getMetricGroup()
                                .gauge("DbOpGauge", (Gauge<String>) () -> 
String.valueOf(opTimeInSec));

        }


}




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Gauge-Metric-is-not-getting-updated-on-Job-Dashboard-tp13842p13888.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to