This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push: new 4844604 KAFKA-10134: Use long poll if we do not have fetchable partitions (#8934) 4844604 is described below commit 48446042ea322c168238398a9fd2ed236a7ac869 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Wed Jul 8 09:51:50 2020 -0700 KAFKA-10134: Use long poll if we do not have fetchable partitions (#8934) The intention of using poll(0) is to not block on rebalance but still return some data; however, `updateAssignmentMetadataIfNeeded` have three different logic: 1) discover coordinator if necessary, 2) join-group if necessary, 3) refresh metadata and fetch position if necessary. We only want to make 2) to be non-blocking but not others, since e.g. when the coordinator is down, then heartbeat would expire and cause the consumer to fetch with timeout 0 as well, causing unnecessarily high CPU. Since splitting this function is a rather big change to make as a last minute blocker fix for 2.6, so I made a smaller change to make updateAssignmentMetadataIfNeeded has an optional boolean flag to indicate if 2) above should wait until either expired or complete, otherwise do not wait on the join-group future and just poll with zero timer. Reviewers: Jason Gustafson <ja...@confluent.io> --- .../apache/kafka/clients/consumer/KafkaConsumer.java | 17 ++++++++--------- .../clients/consumer/internals/ConsumerCoordinator.java | 13 ++++++++++--- .../kafka/clients/consumer/KafkaConsumerTest.java | 9 +++++++-- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 145dd05..438c563 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1228,19 +1228,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); } - // poll for new data until the timeout expires do { client.maybeTriggerWakeup(); if (includeMetadataInTimeout) { - // try to update assignment metadata BUT do not need to block on the timer, - // since even if we are 1) in the middle of a rebalance or 2) have partitions - // with unknown starting positions we may still want to return some data - // as long as there are some partitions fetchable; NOTE we always use a timer with 0ms - // to never block on completing the rebalance procedure if there's any - updateAssignmentMetadataIfNeeded(time.timer(0L)); + // try to update assignment metadata BUT do not need to block on the timer for join group + updateAssignmentMetadataIfNeeded(timer, false); } else { - while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) { + while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) { log.warn("Still waiting for metadata"); } } @@ -1272,7 +1267,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * Visible for testing */ boolean updateAssignmentMetadataIfNeeded(final Timer timer) { - if (coordinator != null && !coordinator.poll(timer)) { + return updateAssignmentMetadataIfNeeded(timer, true); + } + + boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) { + if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) { return false; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 2978611..66e713a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -436,18 +436,24 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } + // for testing + boolean poll(Timer timer) { + return poll(timer, true); + } + /** * Poll for coordinator events. This ensures that the coordinator is known and that the consumer * has joined the group (if it is using group management). This also handles periodic offset commits * if they are enabled. * <p> - * Returns early if the timeout expires + * Returns early if the timeout expires or if waiting on rejoin is not required * * @param timer Timer bounding how long this method can block + * @param waitForJoinGroup Boolean flag indicating if we should wait until re-join group completes * @throws KafkaException if the rebalance callback throws an exception * @return true iff the operation succeeded */ - public boolean poll(Timer timer) { + public boolean poll(Timer timer, boolean waitForJoinGroup) { maybeUpdateSubscriptionMetadata(); invokeCompletedOffsetCommitCallbacks(); @@ -487,7 +493,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { maybeUpdateSubscriptionMetadata(); } - if (!ensureActiveGroup(timer)) { + // if not wait for join group, we would just use a timer of 0 + if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) { return false; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 00d4db4..4644ffd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1785,14 +1785,19 @@ public class KafkaConsumerTest { initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1))); consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer)); + Node node = metadata.fetch().nodes().get(0); Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null); - // a first rebalance to get the assignment, we need two poll calls since we need two round trips to finish join / sync-group - consumer.poll(Duration.ZERO); + // a first poll with zero millisecond would not complete the rebalance consumer.poll(Duration.ZERO); assertEquals(Utils.mkSet(topic, topic2), consumer.subscription()); + assertEquals(Collections.emptySet(), consumer.assignment()); + + // a second poll with non-zero milliseconds would complete three round-trips (discover, join, sync) + consumer.poll(Duration.ofMillis(100L)); + assertEquals(Utils.mkSet(tp0, t2p0), consumer.assignment()); // prepare a response of the outstanding fetch so that we have data available on the next poll