Repository: incubator-gobblin Updated Branches: refs/heads/master 8879cdec2 -> de83a3fb5
[GOBBLIN-402] Add more metrics for gobblin cluster and fix the getJobs slowness issue Closes #2276 from yukuai518/morem Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/de83a3fb Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/de83a3fb Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/de83a3fb Branch: refs/heads/master Commit: de83a3fb5f2644e9657b546c7415d1e8f5336a2c Parents: 8879cde Author: Kuai Yu <k...@linkedin.com> Authored: Fri Feb 2 13:25:53 2018 -0800 Committer: Hung Tran <hut...@linkedin.com> Committed: Fri Feb 2 13:25:53 2018 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/GobblinClusterManager.java | 6 +- .../cluster/GobblinHelixJobScheduler.java | 33 ++++++- .../StreamingJobConfigurationManager.java | 3 + .../service/StreamingKafkaSpecConsumer.java | 92 +++++++++++++++++++- .../apache/gobblin/runtime/api/JobCatalog.java | 7 +- .../runtime/api/JobExecutionLauncher.java | 6 ++ .../runtime/job_monitor/KafkaJobMonitor.java | 3 + 7 files changed, 139 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index 77e511e..3393df6 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -153,6 +153,9 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri private MutableJobCatalog jobCatalog; @Getter private GobblinHelixJobScheduler jobScheduler; + @Getter + private JobConfigurationManager jobConfigurationManager; + private final String clusterName; private final Config config; private final MetricContext metricContext; @@ -209,7 +212,8 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri this.jobScheduler = buildGobblinHelixJobScheduler(config, this.appWorkDir, getMetadataTags(clusterName, applicationId), schedulerService); this.applicationLauncher.addService(this.jobScheduler); - this.applicationLauncher.addService(buildJobConfigurationManager(config)); + this.jobConfigurationManager = buildJobConfigurationManager(config); + this.applicationLauncher.addService(this.jobConfigurationManager); } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index 36ba542..141e3d1 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; @@ -157,7 +159,17 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe private final ContextAwareTimer timeBeforeJobScheduling; private final ContextAwareTimer timeBeforeJobLaunching; + private final ThreadPoolExecutor threadPoolExecutor; + private final ContextAwareGauge<Integer> executorActiveCount; + private final ContextAwareGauge<Integer> executorMaximumPoolSize; + private final ContextAwareGauge<Integer> executorPoolSize; + private final ContextAwareGauge<Integer> executorCorePoolSize; + private final ContextAwareGauge<Integer> executorQueueSize; + public Metrics(final MetricContext metricContext) { + // Thread executor reference from job scheduler + this.threadPoolExecutor = (ThreadPoolExecutor)GobblinHelixJobScheduler.this.jobExecutor; + // All historical counters this.totalJobsLaunched = new AtomicLong(0); this.totalJobsCompleted = new AtomicLong(0); @@ -177,6 +189,13 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE,1, TimeUnit.MINUTES); this.timeBeforeJobScheduling = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING, 1, TimeUnit.MINUTES); this.timeBeforeJobLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING, 1, TimeUnit.MINUTES); + + // executor metrics + this.executorActiveCount = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_ACTIVE_COUNT, ()->this.threadPoolExecutor.getActiveCount()); + this.executorMaximumPoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_MAX_POOL_SIZE, ()->this.threadPoolExecutor.getMaximumPoolSize()); + this.executorPoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_POOL_SIZE, ()->this.threadPoolExecutor.getPoolSize()); + this.executorCorePoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_CORE_POOL_SIZE, ()->this.threadPoolExecutor.getCorePoolSize()); + this.executorQueueSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE, ()->this.threadPoolExecutor.getQueue().size()); } private void updateTimeBeforeJobScheduling (Properties jobConfig) { @@ -196,7 +215,19 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe @Override public Collection<ContextAwareGauge<?>> getGauges() { - return ImmutableList.of(numJobsRunning, numJobsLaunched, numJobsCompleted, numJobsCommitted, numJobsFailed, numJobsCancelled); + List<ContextAwareGauge<?>> list = Lists.newArrayList(); + list.add(numJobsRunning); + list.add(numJobsLaunched); + list.add(numJobsCompleted); + list.add(numJobsCommitted); + list.add(numJobsFailed); + list.add(numJobsCancelled); + list.add(executorActiveCount); + list.add(executorMaximumPoolSize); + list.add(executorPoolSize); + list.add(executorCorePoolSize); + list.add(executorQueueSize); + return list; } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java index 849dd6a..3c01704 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java @@ -45,6 +45,8 @@ import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.apache.gobblin.runtime.api.SpecConsumer; +import lombok.Getter; + /** * A {@link JobConfigurationManager} that fetches job specs from a {@link SpecConsumer} in a loop @@ -56,6 +58,7 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager { private final ExecutorService fetchJobSpecExecutor; + @Getter private final SpecConsumer specConsumer; private final long stopTimeoutSeconds; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java index 23966e9..4764603 100644 --- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java +++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java @@ -21,18 +21,29 @@ import java.io.Closeable; import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareGauge; +import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.Tag; + import org.slf4j.Logger; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.AbstractIdleService; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -49,6 +60,9 @@ import org.apache.gobblin.util.CompletedFuture; import org.apache.gobblin.util.ConfigUtils; import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY; + +import javax.annotation.Nonnull; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -56,12 +70,16 @@ import lombok.extern.slf4j.Slf4j; * SpecConsumer that consumes from kafka in a streaming manner * Implemented {@link AbstractIdleService} for starting up and shutting down. */ -public class StreamingKafkaSpecConsumer extends AbstractIdleService implements SpecConsumer<Spec>, Closeable { +public class StreamingKafkaSpecConsumer extends AbstractIdleService implements SpecConsumer<Spec>, Closeable, StandardMetricsBridge { public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = "spec.StreamingBlockingQueueSize"; private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100; + @Getter private final AvroJobSpecKafkaJobMonitor _jobMonitor; private final BlockingQueue<ImmutablePair<SpecExecutor.Verb, Spec>> _jobSpecQueue; private final MutableJobCatalog _jobCatalog; + private final MetricContext _metricContext; + private final Metrics _metrics; + private final boolean _isInstrumentedEnabled; public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger> log) { String topic = config.getString(SPEC_KAFKA_TOPICS_KEY); Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, topic, @@ -73,10 +91,12 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S } catch (IOException e) { throw new RuntimeException("Could not create job monitor", e); } - + _isInstrumentedEnabled = GobblinMetrics.isEnabled(ConfigUtils.configToProperties(config)); _jobCatalog = jobCatalog; _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE", DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE)); + _metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass()); + _metrics = new Metrics(this._metricContext); } public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Logger log) { @@ -98,7 +118,7 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S try { Pair<SpecExecutor.Verb, Spec> specPair = _jobSpecQueue.take(); - + _metrics.jobSpecDeqCount.incrementAndGet(); do { changesSpecs.add(specPair); @@ -145,6 +165,7 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S try { _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.ADD, addedJob)); + _metrics.jobSpecEnqCount.incrementAndGet(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -159,6 +180,7 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S jobSpecBuilder.withVersion(deletedJobVersion).withConfigAsProperties(props); _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.DELETE, jobSpecBuilder.build())); + _metrics.jobSpecEnqCount.incrementAndGet(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -169,9 +191,73 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S try { _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.UPDATE, updatedJob)); + _metrics.jobSpecEnqCount.incrementAndGet(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } + + private class Metrics extends StandardMetricsBridge.StandardMetrics { + private ContextAwareGauge<Integer> jobSpecQueueSize; + private ContextAwareGauge<Long> jobSpecEnq; + private ContextAwareGauge<Long> jobSpecDeq; + private ContextAwareGauge<Long> jobSpecConsumed; + private AtomicLong jobSpecEnqCount = new AtomicLong(0); + private AtomicLong jobSpecDeqCount = new AtomicLong(0); + + public static final String SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE = "specConsumerJobSpecQueueSize"; + public static final String SPEC_CONSUMER_JOB_SPEC_ENQ = "specConsumerJobSpecEnq"; + public static final String SPEC_CONSUMER_JOB_SPEC_DEQ = "specConsumerJobSpecDeq"; + public static final String SPEC_CONSUMER_JOB_SPEC_CONSUMED = "specConsumerJobSpecConsumed"; + + + public Metrics(MetricContext context) { + this.jobSpecQueueSize = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, ()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size()); + this.jobSpecEnq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, ()->jobSpecEnqCount.get()); + this.jobSpecDeq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, ()->jobSpecDeqCount.get()); + this.jobSpecConsumed = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED, + ()->StreamingKafkaSpecConsumer.this._jobMonitor.getNewSpecs().getCount() + StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount()); + } + + public Collection<ContextAwareGauge<?>> getGauges() { + List list = Lists.newArrayList(); + list.add(jobSpecQueueSize); + list.add(jobSpecEnq); + list.add(jobSpecDeq); + list.add(jobSpecConsumed); + return list; + } + } + + @Override + public StandardMetrics getStandardMetrics() { + throw new UnsupportedOperationException("Implemented in sub class"); + } + + @Nonnull + @Override + public MetricContext getMetricContext() { + return _metricContext; + } + + @Override + public boolean isInstrumentationEnabled() { + return _isInstrumentedEnabled; + } + + @Override + public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) { + return ImmutableList.of(); + } + + @Override + public void switchMetricContext(List<Tag<?>> tags) { + throw new UnsupportedOperationException(); + } + + @Override + public void switchMetricContext(MetricContext context) { + throw new UnsupportedOperationException(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java index 950b86d..42ecef3 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java @@ -95,12 +95,7 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedJobs.get()); this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedJobs.get()); this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedJobs.get()); - this.numActiveJobs = metricsContext.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->{ - long startTime = System.currentTimeMillis(); - int size = jobCatalog.getJobs().size(); - updateGetJobTime(startTime); - return size; - }); + this.numActiveJobs = metricsContext.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->(int)(totalAddedJobs.get() - totalDeletedJobs.get())); } public void updateGetJobTime(long startTime) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java index a7e5878..3f50ee7 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java @@ -49,6 +49,12 @@ public interface JobExecutionLauncher extends Instrumentable { public static final String TIMER_BEFORE_JOB_SCHEDULING = "timerBeforeJobScheduling"; public static final String TIMER_BEFORE_JOB_LAUNCHING = "timerBeforeJobLaunching"; + public static final String EXECUTOR_ACTIVE_COUNT = "executorActiveCount"; + public static final String EXECUTOR_MAX_POOL_SIZE = "executorMaximumPoolSize"; + public static final String EXECUTOR_POOL_SIZE = "executorPoolSize"; + public static final String EXECUTOR_CORE_POOL_SIZE = "executorCorePoolSize"; + public static final String EXECUTOR_QUEUE_SIZE = "executorQueueSize"; + public static final String TRACKING_EVENT_NAME = "JobExecutionLauncherEvent"; public static final String JOB_EXECID_META = "jobExecId"; public static final String JOB_LAUNCHED_OPERATION_TYPE = "JobLaunched"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/de83a3fb/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java index 8181c16..ba79305 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java @@ -35,6 +35,7 @@ import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.Either; import kafka.message.MessageAndMetadata; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -51,7 +52,9 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]> public static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest"; private final MutableJobCatalog jobCatalog; + @Getter private Counter newSpecs; + @Getter private Counter remmovedSpecs; /**