This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 7f17201 [BEAM-6285] add parameters for offsetConsumer in KafkaIO.read() 7f17201 is described below commit 7f17201640881e7f4bbf85c1d337735ba66168d6 Author: XuMingmin <xuming...@users.noreply.github.com> AuthorDate: Wed Jan 30 01:28:14 2019 -0800 [BEAM-6285] add parameters for offsetConsumer in KafkaIO.read() --- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 23 +++++++++ .../beam/sdk/io/kafka/KafkaUnboundedReader.java | 56 +++++++++++++--------- .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 43 +++++++++++++++++ 3 files changed, 100 insertions(+), 22 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 8b3218b..f27ec68 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -345,6 +345,9 @@ public class KafkaIO { abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory(); + @Nullable + abstract Map<String, Object> getOffsetConsumerConfig(); + abstract Builder<K, V> toBuilder(); @AutoValue.Builder @@ -380,6 +383,8 @@ public class KafkaIO { abstract Builder<K, V> setTimestampPolicyFactory( TimestampPolicyFactory<K, V> timestampPolicyFactory); + abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> offsetConsumerConfig); + abstract Read<K, V> build(); } @@ -656,6 +661,24 @@ public class KafkaIO { return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build(); } + /** + * Set additional configuration for the backend offset consumer. It may be required for a + * secured Kafka cluster, especially when you see similar WARN log message 'exception while + * fetching latest offset for partition {}. will be retried'. + * + * <p>In {@link KafkaIO#read()}, there're two consumers running in the backend actually:<br> + * 1. the main consumer, which reads data from kafka;<br> + * 2. the secondary offset consumer, which is used to estimate backlog, by fetching latest + * offset;<br> + * + * <p>By default, offset consumer inherits the configuration from main consumer, with an + * auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not work in a secured Kafka + * which requires more configurations. + */ + public Read<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> offsetConsumerConfig) { + return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build(); + } + /** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */ public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() { return new TypedWithoutMetadata<>(this); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index ee058aa..580b0bc 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -141,28 +141,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> { consumerPollThread.submit(this::consumerPollLoop); // offsetConsumer setup : - - Object groupId = spec.getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG); - // override group_id and disable auto_commit so that it does not interfere with main consumer - String offsetGroupId = - String.format( - "%s_offset_consumer_%d_%s", - name, (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId)); - Map<String, Object> offsetConsumerConfig = new HashMap<>(spec.getConsumerConfig()); - offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId); - offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - // Force read isolation level to 'read_uncommitted' for offset consumer. This consumer - // fetches latest offset for two reasons : (a) to calculate backlog (number of records - // yet to be consumed) (b) to advance watermark if the backlog is zero. The right thing to do - // for (a) is to leave this config unchanged from the main config (i.e. if there are records - // that can't be read because of uncommitted records before them, they shouldn't - // ideally count towards backlog when "read_committed" is enabled. But (b) - // requires finding out if there are any records left to be read (committed or uncommitted). - // Rather than using two separate consumers we will go with better support for (b). If we do - // hit a case where a lot of records are not readable (due to some stuck transactions), the - // pipeline would report more backlog, but would not be able to consume it. It might be ok - // since CPU consumed on the workers would be low and will likely avoid unnecessary upscale. - offsetConsumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted"); + Map<String, Object> offsetConsumerConfig = getOffsetConsumerConfig(); offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig); consumerSpEL.evaluateAssign(offsetConsumer, spec.getTopicPartitions()); @@ -726,6 +705,39 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> { return backlogCount; } + @VisibleForTesting + Map<String, Object> getOffsetConsumerConfig() { + Map<String, Object> offsetConsumerConfig = new HashMap<>(source.getSpec().getConsumerConfig()); + offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + Object groupId = source.getSpec().getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG); + // override group_id and disable auto_commit so that it does not interfere with main consumer + String offsetGroupId = + String.format( + "%s_offset_consumer_%d_%s", + name, (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId)); + offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId); + + if (source.getSpec().getOffsetConsumerConfig() != null) { + offsetConsumerConfig.putAll(source.getSpec().getOffsetConsumerConfig()); + } + + // Force read isolation level to 'read_uncommitted' for offset consumer. This consumer + // fetches latest offset for two reasons : (a) to calculate backlog (number of records + // yet to be consumed) (b) to advance watermark if the backlog is zero. The right thing to do + // for (a) is to leave this config unchanged from the main config (i.e. if there are records + // that can't be read because of uncommitted records before them, they shouldn't + // ideally count towards backlog when "read_committed" is enabled. But (b) + // requires finding out if there are any records left to be read (committed or uncommitted). + // Rather than using two separate consumers we will go with better support for (b). If we do + // hit a case where a lot of records are not readable (due to some stuck transactions), the + // pipeline would report more backlog, but would not be able to consume it. It might be ok + // since CPU consumed on the workers would be low and will likely avoid unnecessary upscale. + offsetConsumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted"); + + return offsetConsumerConfig; + } + @Override public void close() throws IOException { closed.set(true); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 63e14a3..e6c3ca9 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -1528,6 +1528,49 @@ public class KafkaIOTest { } } + @Test + public void testOffsetConsumerConfigOverrides() throws Exception { + KafkaUnboundedReader reader1 = + new KafkaUnboundedReader( + new KafkaUnboundedSource( + KafkaIO.read() + .withBootstrapServers("broker_1:9092,broker_2:9092") + .withTopic("my_topic") + .withOffsetConsumerConfigOverrides(null), + 0), + null); + assertTrue( + reader1 + .getOffsetConsumerConfig() + .get(ConsumerConfig.GROUP_ID_CONFIG) + .toString() + .matches(".*_offset_consumer_\\d+_none")); + assertEquals( + false, reader1.getOffsetConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + assertEquals( + "read_uncommitted", + reader1.getOffsetConsumerConfig().get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)); + + String offsetGroupId = "group.offsetConsumer"; + KafkaUnboundedReader reader2 = + new KafkaUnboundedReader( + new KafkaUnboundedSource( + KafkaIO.read() + .withBootstrapServers("broker_1:9092,broker_2:9092") + .withTopic("my_topic") + .withOffsetConsumerConfigOverrides( + ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId)), + 0), + null); + assertEquals( + offsetGroupId, reader2.getOffsetConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG)); + assertEquals( + false, reader2.getOffsetConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + assertEquals( + "read_uncommitted", + reader2.getOffsetConsumerConfig().get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)); + } + private static void verifyProducerRecords( MockProducer<Integer, Long> mockProducer, String topic,