[kafka] branch trunk updated (f185016 -> 731630e)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from f185016 MINOR: Should cleanup the tasks after dirty close (#8433) add 731630e KAFKA-9818: improve error message to debug test (#8423) No new revisions were added by this update. Summary of changes: .../kafka/streams/processor/internals/RecordCollectorTest.java | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-)
[kafka] branch trunk updated (9d96523 -> f185016)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 9d96523 MINOR: Fix log cleaner offset range log message (#8435) add f185016 MINOR: Should cleanup the tasks after dirty close (#8433) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/processor/internals/TaskManager.java | 4 ++-- .../apache/kafka/streams/processor/internals/TaskManagerTest.java | 6 ++ 2 files changed, 8 insertions(+), 2 deletions(-)
[kafka] branch trunk updated: MINOR: Fix log cleaner offset range log message (#8435)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9d96523 MINOR: Fix log cleaner offset range log message (#8435) 9d96523 is described below commit 9d965236a4459ddfaf333203cc4465902247926a Author: Jason Gustafson AuthorDate: Mon Apr 6 17:11:12 2020 -0700 MINOR: Fix log cleaner offset range log message (#8435) The upper limit offset is displayed incorrectly in the log cleaner summary message. For example: ``` Log cleaner thread 0 cleaned log __consumer_offsets-47 (dirty section = [358800359, 358800359]) ``` We should be using the next dirty offset as the upper limit. Reviewers: David Arthur --- core/src/main/scala/kafka/log/LogCleaner.scala | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 8252151..2a20292 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -356,17 +356,18 @@ class LogCleaner(initialConfig: CleanerConfig, } private def cleanLog(cleanable: LogToClean): Unit = { - var endOffset = cleanable.firstDirtyOffset + val startOffset = cleanable.firstDirtyOffset + var endOffset = startOffset try { val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable) -recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats) endOffset = nextDirtyOffset +recordStats(cleaner.id, cleanable.log.name, startOffset, endOffset, cleanerStats) } catch { case _: LogCleaningAbortedException => // task can be aborted, let it go. case _: KafkaStorageException => // partition is already offline. let it go. case e: IOException => val logDirectory = cleanable.log.parentDir - val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${logDirectory} due to IOException" + val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir $logDirectory due to IOException" logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e) } finally { cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.parentDirFile, endOffset)
[kafka] branch 2.4 updated: KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new d783a9f KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420) d783a9f is described below commit d783a9f9f697c4df74abdc2a8ee18c533aa0716e Author: Rajini Sivaram AuthorDate: Tue Apr 7 01:00:11 2020 +0100 KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420) On metadata change for assigned topics, we trigger rebalance, revoke partitions and send JoinGroup. If metadata reverts to the original value and JoinGroup fails, we don't resend JoinGroup because we don't set `rejoinNeeded`. This PR sets `rejoinNeeded=true` when rebalance is triggered due to metadata change to ensure that we retry on failure. Reviewers: Boyang Chen , Chia-Ping Tsai , Jason Gustafson --- .../consumer/internals/ConsumerCoordinator.java| 5 +- .../internals/ConsumerCoordinatorTest.java | 58 ++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index b4d9fa5..6b905c5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -727,11 +727,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // we need to rejoin if we performed the assignment and metadata has changed; // also for those owned-but-no-longer-existed partitions we should drop them as lost -if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) +if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) { +requestRejoin(); return true; +} // we need to join if our subscription has changed since the last join if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) { +requestRejoin(); return true; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 9e6128b..afef951 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -843,6 +843,64 @@ public class ConsumerCoordinatorTest { assertFalse(coordinator.rejoinNeededOrPending()); } +/** + * Verifies that the consumer re-joins after a metadata change. If JoinGroup fails + * and metadata reverts to its original value, the consumer should still retry JoinGroup. + */ +@Test +public void testRebalanceWithMetadataChange() { +final String consumerId = "leader"; +final List topics = Arrays.asList(topic1, topic2); +final List partitions = Arrays.asList(t1p, t2p); +subscriptions.subscribe(toSet(topics), rebalanceListener); +client.updateMetadata(TestUtils.metadataUpdateWith(1, +Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1; +coordinator.maybeUpdateSubscriptionMetadata(); + +client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +Map> initialSubscription = singletonMap(consumerId, topics); +partitionAssignor.prepare(singletonMap(consumerId, partitions)); + +client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE)); +client.prepareResponse(syncGroupResponse(partitions, Errors.NONE)); +coordinator.poll(time.timer(Long.MAX_VALUE)); + +// rejoin will only be set in the next poll call +assertFalse(coordinator.rejoinNeededOrPending()); +assertEquals(toSet(topics), subscriptions.subscription()); +assertEquals(toSet(partitions), subscriptions.assignedPartitions()); +assertEquals(0, rebalanceListener.revokedCount); +assertNull(rebalanceListener.revoked); +assertEquals(1, rebalanceListener.assignedCount); + +// Change metadata to trigger rebalance. +client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1))); +coordinator.poll(time.timer(0)); + +// Revert metadata to original value. Fail pending JoinGroup. Another +// JoinGroup should be sent, which will be completed successfully. +
[kafka] branch 2.5 updated: KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.5 by this push: new fd6bd32 KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420) fd6bd32 is described below commit fd6bd32c7edbd779b7dd7848c6726536954e78e3 Author: Rajini Sivaram AuthorDate: Tue Apr 7 01:00:11 2020 +0100 KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420) On metadata change for assigned topics, we trigger rebalance, revoke partitions and send JoinGroup. If metadata reverts to the original value and JoinGroup fails, we don't resend JoinGroup because we don't set `rejoinNeeded`. This PR sets `rejoinNeeded=true` when rebalance is triggered due to metadata change to ensure that we retry on failure. Reviewers: Boyang Chen , Chia-Ping Tsai , Jason Gustafson --- .../consumer/internals/ConsumerCoordinator.java| 5 +- .../internals/ConsumerCoordinatorTest.java | 58 ++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 283e769..8faa34a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -739,11 +739,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // we need to rejoin if we performed the assignment and metadata has changed; // also for those owned-but-no-longer-existed partitions we should drop them as lost -if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) +if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) { +requestRejoin(); return true; +} // we need to join if our subscription has changed since the last join if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) { +requestRejoin(); return true; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 06491eb..abf2938 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -845,6 +845,64 @@ public class ConsumerCoordinatorTest { assertFalse(coordinator.rejoinNeededOrPending()); } +/** + * Verifies that the consumer re-joins after a metadata change. If JoinGroup fails + * and metadata reverts to its original value, the consumer should still retry JoinGroup. + */ +@Test +public void testRebalanceWithMetadataChange() { +final String consumerId = "leader"; +final List topics = Arrays.asList(topic1, topic2); +final List partitions = Arrays.asList(t1p, t2p); +subscriptions.subscribe(toSet(topics), rebalanceListener); +client.updateMetadata(TestUtils.metadataUpdateWith(1, +Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1; +coordinator.maybeUpdateSubscriptionMetadata(); + +client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +Map> initialSubscription = singletonMap(consumerId, topics); +partitionAssignor.prepare(singletonMap(consumerId, partitions)); + +client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE)); +client.prepareResponse(syncGroupResponse(partitions, Errors.NONE)); +coordinator.poll(time.timer(Long.MAX_VALUE)); + +// rejoin will only be set in the next poll call +assertFalse(coordinator.rejoinNeededOrPending()); +assertEquals(toSet(topics), subscriptions.subscription()); +assertEquals(toSet(partitions), subscriptions.assignedPartitions()); +assertEquals(0, rebalanceListener.revokedCount); +assertNull(rebalanceListener.revoked); +assertEquals(1, rebalanceListener.assignedCount); + +// Change metadata to trigger rebalance. +client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1))); +coordinator.poll(time.timer(0)); + +// Revert metadata to original value. Fail pending JoinGroup. Another +// JoinGroup should be sent, which will be completed successfully. +
[kafka] branch trunk updated: KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 588e8a5 KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420) 588e8a5 is described below commit 588e8a5be83037225716a78358d0203a9e070168 Author: Rajini Sivaram AuthorDate: Tue Apr 7 01:00:11 2020 +0100 KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420) On metadata change for assigned topics, we trigger rebalance, revoke partitions and send JoinGroup. If metadata reverts to the original value and JoinGroup fails, we don't resend JoinGroup because we don't set `rejoinNeeded`. This PR sets `rejoinNeeded=true` when rebalance is triggered due to metadata change to ensure that we retry on failure. Reviewers: Boyang Chen , Chia-Ping Tsai , Jason Gustafson --- .../consumer/internals/ConsumerCoordinator.java| 5 +- .../internals/ConsumerCoordinatorTest.java | 58 ++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index b8d44b4..ab49837 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -747,11 +747,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // we need to rejoin if we performed the assignment and metadata has changed; // also for those owned-but-no-longer-existed partitions we should drop them as lost -if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) +if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) { +requestRejoin(); return true; +} // we need to join if our subscription has changed since the last join if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) { +requestRejoin(); return true; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 9a189a8..aaf2962 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -974,6 +974,64 @@ public class ConsumerCoordinatorTest { assertFalse(coordinator.rejoinNeededOrPending()); } +/** + * Verifies that the consumer re-joins after a metadata change. If JoinGroup fails + * and metadata reverts to its original value, the consumer should still retry JoinGroup. + */ +@Test +public void testRebalanceWithMetadataChange() { +final String consumerId = "leader"; +final List topics = Arrays.asList(topic1, topic2); +final List partitions = Arrays.asList(t1p, t2p); +subscriptions.subscribe(toSet(topics), rebalanceListener); +client.updateMetadata(TestUtils.metadataUpdateWith(1, +Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1; +coordinator.maybeUpdateSubscriptionMetadata(); + +client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +Map> initialSubscription = singletonMap(consumerId, topics); +partitionAssignor.prepare(singletonMap(consumerId, partitions)); + +client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE)); +client.prepareResponse(syncGroupResponse(partitions, Errors.NONE)); +coordinator.poll(time.timer(Long.MAX_VALUE)); + +// rejoin will only be set in the next poll call +assertFalse(coordinator.rejoinNeededOrPending()); +assertEquals(toSet(topics), subscriptions.subscription()); +assertEquals(toSet(partitions), subscriptions.assignedPartitions()); +assertEquals(0, rebalanceListener.revokedCount); +assertNull(rebalanceListener.revoked); +assertEquals(1, rebalanceListener.assignedCount); + +// Change metadata to trigger rebalance. +client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1))); +coordinator.poll(time.timer(0)); + +// Revert metadata to original value. Fail pending JoinGroup. Another +// JoinGroup should be sent, which will be completed successfully. +
[kafka] branch trunk updated (712ac52 -> 82dff1d)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 712ac52 KAFKA-9793: Expand the try-catch for task commit in HandleAssignment (#8402) add 82dff1d KAFKA-9753: A few more metrics to add (#8371) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/KafkaStreams.java | 15 +- .../streams/internals/metrics/ClientMetrics.java | 12 ++ .../streams/processor/internals/StreamTask.java| 5 +- .../streams/processor/internals/StreamThread.java | 13 +- .../kafka/streams/processor/internals/Task.java| 2 +- .../streams/processor/internals/TaskManager.java | 2 +- .../internals/metrics/StreamsMetricsImpl.java | 1 + .../processor/internals/metrics/TaskMetrics.java | 20 +- .../processor/internals/metrics/ThreadMetrics.java | 39 .../org/apache/kafka/streams/KafkaStreamsTest.java | 1 + .../integration/MetricsIntegrationTest.java| 106 +- .../internals/metrics/ClientMetricsTest.java | 45 +++- .../kstream/internals/metrics/TaskMetricsTest.java | 24 +++ .../processor/internals/StreamTaskTest.java| 49 - .../processor/internals/StreamThreadTest.java | 24 ++- .../internals/metrics/ThreadMetricsTest.java | 230 + 16 files changed, 418 insertions(+), 170 deletions(-)