Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 8879cdec2 -> de83a3fb5


[GOBBLIN-402] Add more metrics for gobblin cluster and fix the getJobs slowness 
issue

Closes #2276 from yukuai518/morem


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/de83a3fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/de83a3fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/de83a3fb

Branch: refs/heads/master
Commit: de83a3fb5f2644e9657b546c7415d1e8f5336a2c
Parents: 8879cde
Author: Kuai Yu <k...@linkedin.com>
Authored: Fri Feb 2 13:25:53 2018 -0800
Committer: Hung Tran <hut...@linkedin.com>
Committed: Fri Feb 2 13:25:53 2018 -0800

----------------------------------------------------------------------
 .../gobblin/cluster/GobblinClusterManager.java  |  6 +-
 .../cluster/GobblinHelixJobScheduler.java       | 33 ++++++-
 .../StreamingJobConfigurationManager.java       |  3 +
 .../service/StreamingKafkaSpecConsumer.java     | 92 +++++++++++++++++++-
 .../apache/gobblin/runtime/api/JobCatalog.java  |  7 +-
 .../runtime/api/JobExecutionLauncher.java       |  6 ++
 .../runtime/job_monitor/KafkaJobMonitor.java    |  3 +
 7 files changed, 139 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 77e511e..3393df6 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -153,6 +153,9 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   private MutableJobCatalog jobCatalog;
   @Getter
   private GobblinHelixJobScheduler jobScheduler;
+  @Getter
+  private JobConfigurationManager jobConfigurationManager;
+  
   private final String clusterName;
   private final Config config;
   private final MetricContext metricContext;
@@ -209,7 +212,8 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
     this.jobScheduler = buildGobblinHelixJobScheduler(config, this.appWorkDir, 
getMetadataTags(clusterName, applicationId),
         schedulerService);
     this.applicationLauncher.addService(this.jobScheduler);
-    this.applicationLauncher.addService(buildJobConfigurationManager(config));
+    this.jobConfigurationManager = buildJobConfigurationManager(config);
+    this.applicationLauncher.addService(this.jobConfigurationManager);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 36ba542..141e3d1 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
@@ -157,7 +159,17 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
     private final ContextAwareTimer timeBeforeJobScheduling;
     private final ContextAwareTimer timeBeforeJobLaunching;
 
+    private final ThreadPoolExecutor threadPoolExecutor;
+    private final ContextAwareGauge<Integer> executorActiveCount;
+    private final ContextAwareGauge<Integer> executorMaximumPoolSize;
+    private final ContextAwareGauge<Integer> executorPoolSize;
+    private final ContextAwareGauge<Integer> executorCorePoolSize;
+    private final ContextAwareGauge<Integer> executorQueueSize;
+
     public Metrics(final MetricContext metricContext) {
+      // Thread executor reference from job scheduler
+      this.threadPoolExecutor = 
(ThreadPoolExecutor)GobblinHelixJobScheduler.this.jobExecutor;
+
       // All historical counters
       this.totalJobsLaunched = new AtomicLong(0);
       this.totalJobsCompleted = new AtomicLong(0);
@@ -177,6 +189,13 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
       this.timeForJobFailure = 
metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE,1,
 TimeUnit.MINUTES);
       this.timeBeforeJobScheduling = 
metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING,
 1, TimeUnit.MINUTES);
       this.timeBeforeJobLaunching = 
metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING,
 1, TimeUnit.MINUTES);
+
+      // executor metrics
+      this.executorActiveCount = 
metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_ACTIVE_COUNT,
 ()->this.threadPoolExecutor.getActiveCount());
+      this.executorMaximumPoolSize = 
metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_MAX_POOL_SIZE,
 ()->this.threadPoolExecutor.getMaximumPoolSize());
+      this.executorPoolSize = 
metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_POOL_SIZE,
 ()->this.threadPoolExecutor.getPoolSize());
+      this.executorCorePoolSize =  
metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_CORE_POOL_SIZE,
 ()->this.threadPoolExecutor.getCorePoolSize());
+      this.executorQueueSize = 
metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE,
 ()->this.threadPoolExecutor.getQueue().size());
     }
 
     private void updateTimeBeforeJobScheduling (Properties jobConfig) {
@@ -196,7 +215,19 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
 
     @Override
     public Collection<ContextAwareGauge<?>> getGauges() {
-      return ImmutableList.of(numJobsRunning, numJobsLaunched, 
numJobsCompleted, numJobsCommitted, numJobsFailed, numJobsCancelled);
+      List<ContextAwareGauge<?>> list = Lists.newArrayList();
+      list.add(numJobsRunning);
+      list.add(numJobsLaunched);
+      list.add(numJobsCompleted);
+      list.add(numJobsCommitted);
+      list.add(numJobsFailed);
+      list.add(numJobsCancelled);
+      list.add(executorActiveCount);
+      list.add(executorMaximumPoolSize);
+      list.add(executorPoolSize);
+      list.add(executorCorePoolSize);
+      list.add(executorQueueSize);
+      return list;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
index 849dd6a..3c01704 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java
@@ -45,6 +45,8 @@ import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.runtime.api.SpecConsumer;
 
+import lombok.Getter;
+
 
 /**
  * A {@link JobConfigurationManager} that fetches job specs from a {@link 
SpecConsumer} in a loop
@@ -56,6 +58,7 @@ public class StreamingJobConfigurationManager extends 
JobConfigurationManager {
 
   private final ExecutorService fetchJobSpecExecutor;
 
+  @Getter
   private final SpecConsumer specConsumer;
 
   private final long stopTimeoutSeconds;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index 23966e9..4764603 100644
--- 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -21,18 +21,29 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+
 import org.slf4j.Logger;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -49,6 +60,9 @@ import org.apache.gobblin.util.CompletedFuture;
 import org.apache.gobblin.util.ConfigUtils;
 
 import static 
org.apache.gobblin.service.SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY;
+
+import javax.annotation.Nonnull;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -56,12 +70,16 @@ import lombok.extern.slf4j.Slf4j;
  * SpecConsumer that consumes from kafka in a streaming manner
  * Implemented {@link AbstractIdleService} for starting up and shutting down.
  */
-public class StreamingKafkaSpecConsumer extends AbstractIdleService implements 
SpecConsumer<Spec>, Closeable {
+public class StreamingKafkaSpecConsumer extends AbstractIdleService implements 
SpecConsumer<Spec>, Closeable, StandardMetricsBridge {
   public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 
"spec.StreamingBlockingQueueSize";
   private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100;
+  @Getter
   private final AvroJobSpecKafkaJobMonitor _jobMonitor;
   private final BlockingQueue<ImmutablePair<SpecExecutor.Verb, Spec>> 
_jobSpecQueue;
   private final MutableJobCatalog _jobCatalog;
+  private final MetricContext _metricContext;
+  private final Metrics _metrics;
+  private final boolean _isInstrumentedEnabled;
   public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog 
jobCatalog, Optional<Logger> log) {
     String topic = config.getString(SPEC_KAFKA_TOPICS_KEY);
     Config defaults = 
ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, 
topic,
@@ -73,10 +91,12 @@ public class StreamingKafkaSpecConsumer extends 
AbstractIdleService implements S
     } catch (IOException e) {
       throw new RuntimeException("Could not create job monitor", e);
     }
-
+    _isInstrumentedEnabled = 
GobblinMetrics.isEnabled(ConfigUtils.configToProperties(config));
     _jobCatalog = jobCatalog;
     _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, 
"SPEC_STREAMING_BLOCKING_QUEUE_SIZE",
         DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE));
+    _metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.getClass());
+    _metrics = new Metrics(this._metricContext);
   }
 
   public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog 
jobCatalog, Logger log) {
@@ -98,7 +118,7 @@ public class StreamingKafkaSpecConsumer extends 
AbstractIdleService implements S
 
     try {
       Pair<SpecExecutor.Verb, Spec> specPair = _jobSpecQueue.take();
-
+      _metrics.jobSpecDeqCount.incrementAndGet();
       do {
         changesSpecs.add(specPair);
 
@@ -145,6 +165,7 @@ public class StreamingKafkaSpecConsumer extends 
AbstractIdleService implements S
 
       try {
         _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, 
Spec>(SpecExecutor.Verb.ADD, addedJob));
+        _metrics.jobSpecEnqCount.incrementAndGet();
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
@@ -159,6 +180,7 @@ public class StreamingKafkaSpecConsumer extends 
AbstractIdleService implements S
         
jobSpecBuilder.withVersion(deletedJobVersion).withConfigAsProperties(props);
 
         _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, 
Spec>(SpecExecutor.Verb.DELETE, jobSpecBuilder.build()));
+        _metrics.jobSpecEnqCount.incrementAndGet();
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
@@ -169,9 +191,73 @@ public class StreamingKafkaSpecConsumer extends 
AbstractIdleService implements S
 
       try {
         _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, 
Spec>(SpecExecutor.Verb.UPDATE, updatedJob));
+        _metrics.jobSpecEnqCount.incrementAndGet();
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
     }
   }
+
+  private class Metrics extends StandardMetricsBridge.StandardMetrics {
+    private ContextAwareGauge<Integer> jobSpecQueueSize;
+    private ContextAwareGauge<Long> jobSpecEnq;
+    private ContextAwareGauge<Long> jobSpecDeq;
+    private ContextAwareGauge<Long> jobSpecConsumed;
+    private AtomicLong jobSpecEnqCount = new AtomicLong(0);
+    private AtomicLong jobSpecDeqCount = new AtomicLong(0);
+
+    public static final String SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE = 
"specConsumerJobSpecQueueSize";
+    public static final String SPEC_CONSUMER_JOB_SPEC_ENQ = 
"specConsumerJobSpecEnq";
+    public static final String SPEC_CONSUMER_JOB_SPEC_DEQ = 
"specConsumerJobSpecDeq";
+    public static final String SPEC_CONSUMER_JOB_SPEC_CONSUMED = 
"specConsumerJobSpecConsumed";
+
+
+    public Metrics(MetricContext context) {
+      this.jobSpecQueueSize = 
context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, 
()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size());
+      this.jobSpecEnq = 
context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, 
()->jobSpecEnqCount.get());
+      this.jobSpecDeq = 
context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, 
()->jobSpecDeqCount.get());
+      this.jobSpecConsumed = 
context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED,
+          
()->StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs().getCount() + 
StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount());
+    }
+
+    public Collection<ContextAwareGauge<?>> getGauges() {
+      List list = Lists.newArrayList();
+      list.add(jobSpecQueueSize);
+      list.add(jobSpecEnq);
+      list.add(jobSpecDeq);
+      list.add(jobSpecConsumed);
+      return list;
+    }
+  }
+
+  @Override
+  public StandardMetrics getStandardMetrics() {
+    throw new UnsupportedOperationException("Implemented in sub class");
+  }
+
+  @Nonnull
+  @Override
+  public MetricContext getMetricContext() {
+    return _metricContext;
+  }
+
+  @Override
+  public boolean isInstrumentationEnabled() {
+    return _isInstrumentedEnabled;
+  }
+
+  @Override
+  public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State 
state) {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public void switchMetricContext(List<Tag<?>> tags) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void switchMetricContext(MetricContext context) {
+    throw new UnsupportedOperationException();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
index 950b86d..42ecef3 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
@@ -95,12 +95,7 @@ public interface JobCatalog extends 
JobCatalogListenersContainer, Instrumentable
       this.totalAddCalls = 
metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, 
()->this.totalAddedJobs.get());
       this.totalUpdateCalls = 
metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, 
()->this.totalUpdatedJobs.get());
       this.totalDeleteCalls = 
metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, 
()->this.totalDeletedJobs.get());
-      this.numActiveJobs = 
metricsContext.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->{
-          long startTime = System.currentTimeMillis();
-          int size = jobCatalog.getJobs().size();
-          updateGetJobTime(startTime);
-          return size;
-      });
+      this.numActiveJobs = 
metricsContext.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, 
()->(int)(totalAddedJobs.get() - totalDeletedJobs.get()));
     }
 
     public void updateGetJobTime(long startTime) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
index a7e5878..3f50ee7 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
@@ -49,6 +49,12 @@ public interface JobExecutionLauncher extends Instrumentable 
{
     public static final String TIMER_BEFORE_JOB_SCHEDULING = 
"timerBeforeJobScheduling";
     public static final String TIMER_BEFORE_JOB_LAUNCHING = 
"timerBeforeJobLaunching";
 
+    public static final String EXECUTOR_ACTIVE_COUNT = "executorActiveCount";
+    public static final String EXECUTOR_MAX_POOL_SIZE = 
"executorMaximumPoolSize";
+    public static final String EXECUTOR_POOL_SIZE = "executorPoolSize";
+    public static final String EXECUTOR_CORE_POOL_SIZE = 
"executorCorePoolSize";
+    public static final String EXECUTOR_QUEUE_SIZE = "executorQueueSize";
+
     public static final String TRACKING_EVENT_NAME = 
"JobExecutionLauncherEvent";
     public static final String JOB_EXECID_META = "jobExecId";
     public static final String JOB_LAUNCHED_OPERATION_TYPE = "JobLaunched";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 8181c16..ba79305 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -35,6 +35,7 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.Either;
 
 import kafka.message.MessageAndMetadata;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 
@@ -51,7 +52,9 @@ public abstract class KafkaJobMonitor extends 
HighLevelConsumer<byte[], byte[]>
   public static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest";
 
   private final MutableJobCatalog jobCatalog;
+  @Getter
   private Counter newSpecs;
+  @Getter
   private Counter remmovedSpecs;
 
   /**

Reply via email to