Repository: beam Updated Branches: refs/heads/master 81474aeaf -> 47821ad69
[BEAM-1398] KafkaIO metrics. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/930c27f5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/930c27f5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/930c27f5 Branch: refs/heads/master Commit: 930c27f55fc980702089fe58fdb0edded96a2ac6 Parents: 81474ae Author: Aviem Zur <aviem...@gmail.com> Authored: Tue Mar 28 07:29:53 2017 +0300 Committer: Aviem Zur <aviem...@gmail.com> Committed: Sat Apr 29 18:08:19 2017 +0300 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 65 +++++++++- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 130 +++++++++++++++++++ 2 files changed, 194 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/930c27f5/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 47d8281..211f1a4 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -69,6 +69,10 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark; import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaDeserializer; import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaSerializer; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Gauge; +import org.apache.beam.sdk.metrics.SinkMetrics; +import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; @@ -950,6 +954,13 @@ public class KafkaIO { private Deserializer<K> keyDeserializerInstance = null; private Deserializer<V> valueDeserializerInstance = null; + private final Counter elementsRead = SourceMetrics.elementsRead(); + private final Counter bytesRead = SourceMetrics.bytesRead(); + private final Counter elementsReadBySplit; + private final Counter bytesReadBySplit; + private final Gauge backlogBytesOfSplit; + private final Gauge backlogElementsOfSplit; + private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000); private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10); @@ -1023,10 +1034,18 @@ public class KafkaIO { synchronized long approxBacklogInBytes() { // Note that is an an estimate of uncompressed backlog. + long backlogMessageCount = backlogMessageCount(); + if (backlogMessageCount == UnboundedReader.BACKLOG_UNKNOWN) { + return UnboundedReader.BACKLOG_UNKNOWN; + } + return (long) (backlogMessageCount * avgRecordSize); + } + + synchronized long backlogMessageCount() { if (latestOffset < 0 || nextOffset < 0) { return UnboundedReader.BACKLOG_UNKNOWN; } - return Math.max(0, (long) ((latestOffset - nextOffset) * avgRecordSize)); + return Math.max(0, (latestOffset - nextOffset)); } } @@ -1065,6 +1084,13 @@ public class KafkaIO { partitionStates.get(i).nextOffset = ckptMark.getNextOffset(); } } + + String splitId = String.valueOf(source.id); + + elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId); + bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId); + backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId); + backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId); } private void consumerPollLoop() { @@ -1194,6 +1220,9 @@ public class KafkaIO { if (curBatch.hasNext()) { PartitionState pState = curBatch.next(); + elementsRead.inc(); + elementsReadBySplit.inc(); + if (!pState.recordIter.hasNext()) { // -- (c) pState.recordIter = Collections.emptyIterator(); // drop ref curBatch.remove(); @@ -1241,6 +1270,8 @@ public class KafkaIO { int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); pState.recordConsumed(offset, recordSize); + bytesRead.inc(recordSize); + bytesReadBySplit.inc(recordSize); return true; } else { // -- (b) @@ -1278,6 +1309,19 @@ public class KafkaIO { LOG.debug("{}: backlog {}", this, getSplitBacklogBytes()); } + private void reportBacklog() { + long splitBacklogBytes = getSplitBacklogBytes(); + if (splitBacklogBytes < 0) { + splitBacklogBytes = UnboundedReader.BACKLOG_UNKNOWN; + } + backlogBytesOfSplit.set(splitBacklogBytes); + long splitBacklogMessages = getSplitBacklogMessageCount(); + if (splitBacklogMessages < 0) { + splitBacklogMessages = UnboundedReader.BACKLOG_UNKNOWN; + } + backlogElementsOfSplit.set(splitBacklogMessages); + } + @Override public Instant getWatermark() { if (curRecord == null) { @@ -1291,6 +1335,7 @@ public class KafkaIO { @Override public CheckpointMark getCheckpointMark() { + reportBacklog(); return new KafkaCheckpointMark(ImmutableList.copyOf(// avoid lazy (consumedOffset can change) Lists.transform(partitionStates, new Function<PartitionState, PartitionMark>() { @@ -1336,6 +1381,20 @@ public class KafkaIO { return backlogBytes; } + private long getSplitBacklogMessageCount() { + long backlogCount = 0; + + for (PartitionState p : partitionStates) { + long pBacklog = p.backlogMessageCount(); + if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) { + return UnboundedReader.BACKLOG_UNKNOWN; + } + backlogCount += pBacklog; + } + + return backlogCount; + } + @Override public void close() throws IOException { closed.set(true); @@ -1561,6 +1620,8 @@ public class KafkaIO { producer.send( new ProducerRecord<K, V>(spec.getTopic(), kv.getKey(), kv.getValue()), new SendCallback()); + + elementsWritten.inc(); } @FinishBundle @@ -1585,6 +1646,8 @@ public class KafkaIO { private transient Exception sendException = null; private transient long numSendFailures = 0; + private final Counter elementsWritten = SinkMetrics.elementsWritten(); + KafkaWriter(Write<K, V> spec) { this.spec = spec; http://git-wip-us.apache.org/repos/asf/beam/blob/930c27f5/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index d713d90..feb65da 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -17,7 +17,9 @@ */ package org.apache.beam.sdk.io.kafka; +import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -41,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -50,7 +53,17 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer; +import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricMatchers; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.metrics.SinkMetrics; +import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; @@ -90,6 +103,7 @@ import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -550,6 +564,76 @@ public class KafkaIOTest { } @Test + @Category(NeedsRunner.class) + public void testUnboundedSourceMetrics() { + int numElements = 1000; + + String readStep = "readFromKafka"; + + p.apply(readStep, + mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()); + + PipelineResult result = p.run(); + + String splitId = "0"; + + MetricName elementsRead = SourceMetrics.elementsRead().getName(); + MetricName elementsReadBySplit = SourceMetrics.elementsReadBySplit(splitId).getName(); + MetricName bytesRead = SourceMetrics.bytesRead().getName(); + MetricName bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId).getName(); + MetricName backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId).getName(); + MetricName backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId).getName(); + + MetricQueryResults metrics = result.metrics().queryMetrics( + MetricsFilter.builder().build()); + + Iterable<MetricResult<Long>> counters = metrics.counters(); + Iterable<MetricResult<GaugeResult>> gauges = metrics.gauges(); + + assertThat(counters, hasItem( + MetricMatchers.attemptedMetricsResult( + elementsRead.namespace(), + elementsRead.name(), + readStep, + 1000L))); + + assertThat(counters, hasItem( + MetricMatchers.attemptedMetricsResult( + elementsReadBySplit.namespace(), + elementsReadBySplit.name(), + readStep, + 1000L))); + + assertThat(counters, hasItem( + MetricMatchers.attemptedMetricsResult( + bytesRead.namespace(), + bytesRead.name(), + readStep, + 12000L))); + + assertThat(counters, hasItem( + MetricMatchers.attemptedMetricsResult( + bytesReadBySplit.namespace(), + bytesReadBySplit.name(), + readStep, + 12000L))); + + assertThat(gauges, hasItem( + attemptedMetricsResult( + backlogElementsOfSplit.namespace(), + backlogElementsOfSplit.name(), + readStep, + GaugeResult.create(0L, Instant.now())))); + + assertThat(gauges, hasItem( + attemptedMetricsResult( + backlogBytesOfSplit.namespace(), + backlogBytesOfSplit.name(), + readStep, + GaugeResult.create(0L, Instant.now())))); + } + + @Test public void testSink() throws Exception { // Simply read from kafka source and write to kafka sink. Then verify the records // are correctly published to mock kafka producer. @@ -752,6 +836,52 @@ public class KafkaIOTest { instanceof VarLongCoder); } + @Test + @Category(NeedsRunner.class) + public void testSinkMetrics() throws Exception { + // Simply read from kafka source and write to kafka sink. Then verify the metrics are reported. + + int numElements = 1000; + + synchronized (MOCK_PRODUCER_LOCK) { + + MOCK_PRODUCER.clear(); + + ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start(); + + String topic = "test"; + + p + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) + .withoutMetadata()) + .apply("writeToKafka", KafkaIO.<Integer, Long>write() + .withBootstrapServers("none") + .withTopic(topic) + .withKeySerializer(IntegerSerializer.class) + .withValueSerializer(LongSerializer.class) + .withProducerFactoryFn(new ProducerFactoryFn())); + + PipelineResult result = p.run(); + + MetricName elementsWritten = SinkMetrics.elementsWritten().getName(); + + MetricQueryResults metrics = result.metrics().queryMetrics( + MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(elementsWritten.namespace())) + .build()); + + + assertThat(metrics.counters(), hasItem( + MetricMatchers.attemptedMetricsResult( + elementsWritten.namespace(), + elementsWritten.name(), + "writeToKafka", + 1000L))); + + completionThread.shutdown(); + } + } + private static void verifyProducerRecords(String topic, int numElements, boolean keyIsAbsent) { // verify that appropriate messages are written to kafka