This is an automated email from the ASF dual-hosted git repository. mingmxu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 34d8bc9 [BEAM-3525] Fix KafkaIO metric (#4524) 34d8bc9 is described below commit 34d8bc9079a5a4b3d3a5476b2d4b48072be0dff4 Author: Raghu Angadi <rang...@apache.org> AuthorDate: Tue Jan 30 15:39:29 2018 -0800 [BEAM-3525] Fix KafkaIO metric (#4524) * Fix a KafkaIO metric Couple of fixes : 'checkpointMarkCommits' was incremented outside a Beam API context (inside consumerPollLoop), it is not supported. Instead, increment new metric 'enqueued'. 'enqueued' - 'skipped' gives total number of actual commits. Reverted an erlier PR that removed a test dependendency. It is required for tests in order to see test output. * Verify commits metric in unit test. --- sdks/java/io/kafka/pom.xml | 6 ++++++ .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 18 +++++++++--------- .../java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 14 ++++++++++++++ 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml index 38afa00..b04f5bf 100644 --- a/sdks/java/io/kafka/pom.xml +++ b/sdks/java/io/kafka/pom.xml @@ -127,5 +127,11 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 83d702c..996a460 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -911,9 +911,9 @@ public class KafkaIO { @VisibleForTesting static final String METRIC_NAMESPACE = "KafkaIOReader"; @VisibleForTesting - static final String CHECKPOINT_MARK_COMMITS_METRIC = "checkpointMarkCommits"; - private static final String CHECKPOINT_MARK_COMMIT_SKIPS_METRIC = "checkpointMarkCommitSkips"; - + static final String CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC = "checkpointMarkCommitsEnqueued"; + private static final String CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC = + "checkpointMarkCommitsSkipped"; private final UnboundedKafkaSource<K, V> source; private final String name; @@ -932,11 +932,11 @@ public class KafkaIO { private final Counter bytesReadBySplit; private final Gauge backlogBytesOfSplit; private final Gauge backlogElementsOfSplit; - private final Counter checkpointMarkCommits = Metrics.counter( - METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_METRIC); + private final Counter checkpointMarkCommitsEnqueued = Metrics.counter( + METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC); // Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed). - private final Counter checkpointMarkCommitSkips = Metrics.counter( - METRIC_NAMESPACE, CHECKPOINT_MARK_COMMIT_SKIPS_METRIC); + private final Counter checkpointMarkCommitsSkipped = Metrics.counter( + METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_SKIPPED_METRIC); /** * The poll timeout while reading records from Kafka. @@ -1130,7 +1130,6 @@ public class KafkaIO { p -> new OffsetAndMetadata(p.getNextOffset()) )) ); - checkpointMarkCommits.inc(); } /** @@ -1142,8 +1141,9 @@ public class KafkaIO { */ void finalizeCheckpointMarkAsync(KafkaCheckpointMark checkpointMark) { if (finalizedCheckpointMark.getAndSet(checkpointMark) != null) { - checkpointMarkCommitSkips.inc(); + checkpointMarkCommitsSkipped.inc(); } + checkpointMarkCommitsEnqueued.inc(); } private void nextBatch() { diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 08b9d7c..08338d8 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka; import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -714,6 +715,19 @@ public class KafkaIOTest { // since gauge values may be inconsistent in some environments assert only on their existence. assertThat(backlogBytesMetrics.gauges(), IsIterableWithSize.iterableWithSize(1)); + + // Check checkpointMarkCommitsEnqueued metric. + MetricQueryResults commitsEnqueuedMetrics = + result.metrics().queryMetrics( + MetricsFilter.builder() + .addNameFilter( + MetricNameFilter.named( + KafkaIO.UnboundedKafkaReader.METRIC_NAMESPACE, + KafkaIO.UnboundedKafkaReader.CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC)) + .build()); + + assertThat(commitsEnqueuedMetrics.counters(), IsIterableWithSize.iterableWithSize(1)); + assertThat(commitsEnqueuedMetrics.counters().iterator().next().attempted(), greaterThan(0L)); } @Test -- To stop receiving notification emails like this one, please contact ming...@apache.org.