[kafka] branch trunk updated (f185016 -> 731630e)

2020-04-06 Thread mjsax
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from f185016  MINOR: Should cleanup the tasks after dirty close (#8433)
 add 731630e  KAFKA-9818: improve error message to debug test (#8423)

No new revisions were added by this update.

Summary of changes:
 .../kafka/streams/processor/internals/RecordCollectorTest.java | 10 +-
 1 file changed, 9 insertions(+), 1 deletion(-)



[kafka] branch trunk updated (9d96523 -> f185016)

2020-04-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 9d96523  MINOR: Fix log cleaner offset range log message (#8435)
 add f185016  MINOR: Should cleanup the tasks after dirty close (#8433)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/processor/internals/TaskManager.java   | 4 ++--
 .../apache/kafka/streams/processor/internals/TaskManagerTest.java   | 6 ++
 2 files changed, 8 insertions(+), 2 deletions(-)



[kafka] branch trunk updated: MINOR: Fix log cleaner offset range log message (#8435)

2020-04-06 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 9d96523  MINOR: Fix log cleaner offset range log message (#8435)
9d96523 is described below

commit 9d965236a4459ddfaf333203cc4465902247926a
Author: Jason Gustafson 
AuthorDate: Mon Apr 6 17:11:12 2020 -0700

MINOR: Fix log cleaner offset range log message (#8435)

The upper limit offset is displayed incorrectly in the log cleaner summary 
message. For example:
```
Log cleaner thread 0 cleaned log __consumer_offsets-47 (dirty section = 
[358800359, 358800359])
```
We should be using the next dirty offset as the upper limit.

Reviewers: David Arthur 
---
 core/src/main/scala/kafka/log/LogCleaner.scala | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 8252151..2a20292 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -356,17 +356,18 @@ class LogCleaner(initialConfig: CleanerConfig,
 }
 
 private def cleanLog(cleanable: LogToClean): Unit = {
-  var endOffset = cleanable.firstDirtyOffset
+  val startOffset = cleanable.firstDirtyOffset
+  var endOffset = startOffset
   try {
 val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
-recordStats(cleaner.id, cleanable.log.name, 
cleanable.firstDirtyOffset, endOffset, cleanerStats)
 endOffset = nextDirtyOffset
+recordStats(cleaner.id, cleanable.log.name, startOffset, endOffset, 
cleanerStats)
   } catch {
 case _: LogCleaningAbortedException => // task can be aborted, let it 
go.
 case _: KafkaStorageException => // partition is already offline. let 
it go.
 case e: IOException =>
   val logDirectory = cleanable.log.parentDir
-  val msg = s"Failed to clean up log for ${cleanable.topicPartition} 
in dir ${logDirectory} due to IOException"
+  val msg = s"Failed to clean up log for ${cleanable.topicPartition} 
in dir $logDirectory due to IOException"
   logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e)
   } finally {
 cleanerManager.doneCleaning(cleanable.topicPartition, 
cleanable.log.parentDirFile, endOffset)



[kafka] branch 2.4 updated: KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420)

2020-04-06 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
 new d783a9f  KAFKA-9815; Ensure consumer always re-joins if JoinGroup 
fails (#8420)
d783a9f is described below

commit d783a9f9f697c4df74abdc2a8ee18c533aa0716e
Author: Rajini Sivaram 
AuthorDate: Tue Apr 7 01:00:11 2020 +0100

KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420)

On metadata change for assigned topics, we trigger rebalance, revoke 
partitions and send JoinGroup. If metadata reverts to the original value and 
JoinGroup fails, we don't resend JoinGroup because we don't set `rejoinNeeded`. 
This PR sets `rejoinNeeded=true` when rebalance is triggered due to metadata 
change to ensure that we retry on failure.

Reviewers: Boyang Chen , Chia-Ping Tsai 
, Jason Gustafson 
---
 .../consumer/internals/ConsumerCoordinator.java|  5 +-
 .../internals/ConsumerCoordinatorTest.java | 58 ++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b4d9fa5..6b905c5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -727,11 +727,14 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
 // we need to rejoin if we performed the assignment and metadata has 
changed;
 // also for those owned-but-no-longer-existed partitions we should 
drop them as lost
-if (assignmentSnapshot != null && 
!assignmentSnapshot.matches(metadataSnapshot))
+if (assignmentSnapshot != null && 
!assignmentSnapshot.matches(metadataSnapshot)) {
+requestRejoin();
 return true;
+}
 
 // we need to join if our subscription has changed since the last join
 if (joinedSubscription != null && 
!joinedSubscription.equals(subscriptions.subscription())) {
+requestRejoin();
 return true;
 }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 9e6128b..afef951 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -843,6 +843,64 @@ public class ConsumerCoordinatorTest {
 assertFalse(coordinator.rejoinNeededOrPending());
 }
 
+/**
+ * Verifies that the consumer re-joins after a metadata change. If 
JoinGroup fails
+ * and metadata reverts to its original value, the consumer should still 
retry JoinGroup.
+ */
+@Test
+public void testRebalanceWithMetadataChange() {
+final String consumerId = "leader";
+final List topics = Arrays.asList(topic1, topic2);
+final List partitions = Arrays.asList(t1p, t2p);
+subscriptions.subscribe(toSet(topics), rebalanceListener);
+client.updateMetadata(TestUtils.metadataUpdateWith(1,
+Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 
1;
+coordinator.maybeUpdateSubscriptionMetadata();
+
+client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+Map> initialSubscription = 
singletonMap(consumerId, topics);
+partitionAssignor.prepare(singletonMap(consumerId, partitions));
+
+client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
initialSubscription, Errors.NONE));
+client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
+coordinator.poll(time.timer(Long.MAX_VALUE));
+
+// rejoin will only be set in the next poll call
+assertFalse(coordinator.rejoinNeededOrPending());
+assertEquals(toSet(topics), subscriptions.subscription());
+assertEquals(toSet(partitions), subscriptions.assignedPartitions());
+assertEquals(0, rebalanceListener.revokedCount);
+assertNull(rebalanceListener.revoked);
+assertEquals(1, rebalanceListener.assignedCount);
+
+// Change metadata to trigger rebalance.
+client.updateMetadata(TestUtils.metadataUpdateWith(1, 
singletonMap(topic1, 1)));
+coordinator.poll(time.timer(0));
+
+// Revert metadata to original value. Fail pending JoinGroup. Another
+// JoinGroup should be sent, which will be completed successfully.
+

[kafka] branch 2.5 updated: KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420)

2020-04-06 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
 new fd6bd32  KAFKA-9815; Ensure consumer always re-joins if JoinGroup 
fails (#8420)
fd6bd32 is described below

commit fd6bd32c7edbd779b7dd7848c6726536954e78e3
Author: Rajini Sivaram 
AuthorDate: Tue Apr 7 01:00:11 2020 +0100

KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420)

On metadata change for assigned topics, we trigger rebalance, revoke 
partitions and send JoinGroup. If metadata reverts to the original value and 
JoinGroup fails, we don't resend JoinGroup because we don't set `rejoinNeeded`. 
This PR sets `rejoinNeeded=true` when rebalance is triggered due to metadata 
change to ensure that we retry on failure.

Reviewers: Boyang Chen , Chia-Ping Tsai 
, Jason Gustafson 
---
 .../consumer/internals/ConsumerCoordinator.java|  5 +-
 .../internals/ConsumerCoordinatorTest.java | 58 ++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 283e769..8faa34a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -739,11 +739,14 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
 // we need to rejoin if we performed the assignment and metadata has 
changed;
 // also for those owned-but-no-longer-existed partitions we should 
drop them as lost
-if (assignmentSnapshot != null && 
!assignmentSnapshot.matches(metadataSnapshot))
+if (assignmentSnapshot != null && 
!assignmentSnapshot.matches(metadataSnapshot)) {
+requestRejoin();
 return true;
+}
 
 // we need to join if our subscription has changed since the last join
 if (joinedSubscription != null && 
!joinedSubscription.equals(subscriptions.subscription())) {
+requestRejoin();
 return true;
 }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 06491eb..abf2938 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -845,6 +845,64 @@ public class ConsumerCoordinatorTest {
 assertFalse(coordinator.rejoinNeededOrPending());
 }
 
+/**
+ * Verifies that the consumer re-joins after a metadata change. If 
JoinGroup fails
+ * and metadata reverts to its original value, the consumer should still 
retry JoinGroup.
+ */
+@Test
+public void testRebalanceWithMetadataChange() {
+final String consumerId = "leader";
+final List topics = Arrays.asList(topic1, topic2);
+final List partitions = Arrays.asList(t1p, t2p);
+subscriptions.subscribe(toSet(topics), rebalanceListener);
+client.updateMetadata(TestUtils.metadataUpdateWith(1,
+Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 
1;
+coordinator.maybeUpdateSubscriptionMetadata();
+
+client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+Map> initialSubscription = 
singletonMap(consumerId, topics);
+partitionAssignor.prepare(singletonMap(consumerId, partitions));
+
+client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
initialSubscription, Errors.NONE));
+client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
+coordinator.poll(time.timer(Long.MAX_VALUE));
+
+// rejoin will only be set in the next poll call
+assertFalse(coordinator.rejoinNeededOrPending());
+assertEquals(toSet(topics), subscriptions.subscription());
+assertEquals(toSet(partitions), subscriptions.assignedPartitions());
+assertEquals(0, rebalanceListener.revokedCount);
+assertNull(rebalanceListener.revoked);
+assertEquals(1, rebalanceListener.assignedCount);
+
+// Change metadata to trigger rebalance.
+client.updateMetadata(TestUtils.metadataUpdateWith(1, 
singletonMap(topic1, 1)));
+coordinator.poll(time.timer(0));
+
+// Revert metadata to original value. Fail pending JoinGroup. Another
+// JoinGroup should be sent, which will be completed successfully.
+

[kafka] branch trunk updated: KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420)

2020-04-06 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 588e8a5  KAFKA-9815; Ensure consumer always re-joins if JoinGroup 
fails (#8420)
588e8a5 is described below

commit 588e8a5be83037225716a78358d0203a9e070168
Author: Rajini Sivaram 
AuthorDate: Tue Apr 7 01:00:11 2020 +0100

KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420)

On metadata change for assigned topics, we trigger rebalance, revoke 
partitions and send JoinGroup. If metadata reverts to the original value and 
JoinGroup fails, we don't resend JoinGroup because we don't set `rejoinNeeded`. 
This PR sets `rejoinNeeded=true` when rebalance is triggered due to metadata 
change to ensure that we retry on failure.

Reviewers: Boyang Chen , Chia-Ping Tsai 
, Jason Gustafson 
---
 .../consumer/internals/ConsumerCoordinator.java|  5 +-
 .../internals/ConsumerCoordinatorTest.java | 58 ++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b8d44b4..ab49837 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -747,11 +747,14 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
 // we need to rejoin if we performed the assignment and metadata has 
changed;
 // also for those owned-but-no-longer-existed partitions we should 
drop them as lost
-if (assignmentSnapshot != null && 
!assignmentSnapshot.matches(metadataSnapshot))
+if (assignmentSnapshot != null && 
!assignmentSnapshot.matches(metadataSnapshot)) {
+requestRejoin();
 return true;
+}
 
 // we need to join if our subscription has changed since the last join
 if (joinedSubscription != null && 
!joinedSubscription.equals(subscriptions.subscription())) {
+requestRejoin();
 return true;
 }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 9a189a8..aaf2962 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -974,6 +974,64 @@ public class ConsumerCoordinatorTest {
 assertFalse(coordinator.rejoinNeededOrPending());
 }
 
+/**
+ * Verifies that the consumer re-joins after a metadata change. If 
JoinGroup fails
+ * and metadata reverts to its original value, the consumer should still 
retry JoinGroup.
+ */
+@Test
+public void testRebalanceWithMetadataChange() {
+final String consumerId = "leader";
+final List topics = Arrays.asList(topic1, topic2);
+final List partitions = Arrays.asList(t1p, t2p);
+subscriptions.subscribe(toSet(topics), rebalanceListener);
+client.updateMetadata(TestUtils.metadataUpdateWith(1,
+Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 
1;
+coordinator.maybeUpdateSubscriptionMetadata();
+
+client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+Map> initialSubscription = 
singletonMap(consumerId, topics);
+partitionAssignor.prepare(singletonMap(consumerId, partitions));
+
+client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
initialSubscription, Errors.NONE));
+client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
+coordinator.poll(time.timer(Long.MAX_VALUE));
+
+// rejoin will only be set in the next poll call
+assertFalse(coordinator.rejoinNeededOrPending());
+assertEquals(toSet(topics), subscriptions.subscription());
+assertEquals(toSet(partitions), subscriptions.assignedPartitions());
+assertEquals(0, rebalanceListener.revokedCount);
+assertNull(rebalanceListener.revoked);
+assertEquals(1, rebalanceListener.assignedCount);
+
+// Change metadata to trigger rebalance.
+client.updateMetadata(TestUtils.metadataUpdateWith(1, 
singletonMap(topic1, 1)));
+coordinator.poll(time.timer(0));
+
+// Revert metadata to original value. Fail pending JoinGroup. Another
+// JoinGroup should be sent, which will be completed successfully.
+

[kafka] branch trunk updated (712ac52 -> 82dff1d)

2020-04-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 712ac52  KAFKA-9793: Expand the try-catch for task commit in 
HandleAssignment (#8402)
 add 82dff1d  KAFKA-9753: A few more metrics to add (#8371)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/KafkaStreams.java |  15 +-
 .../streams/internals/metrics/ClientMetrics.java   |  12 ++
 .../streams/processor/internals/StreamTask.java|   5 +-
 .../streams/processor/internals/StreamThread.java  |  13 +-
 .../kafka/streams/processor/internals/Task.java|   2 +-
 .../streams/processor/internals/TaskManager.java   |   2 +-
 .../internals/metrics/StreamsMetricsImpl.java  |   1 +
 .../processor/internals/metrics/TaskMetrics.java   |  20 +-
 .../processor/internals/metrics/ThreadMetrics.java |  39 
 .../org/apache/kafka/streams/KafkaStreamsTest.java |   1 +
 .../integration/MetricsIntegrationTest.java| 106 +-
 .../internals/metrics/ClientMetricsTest.java   |  45 +++-
 .../kstream/internals/metrics/TaskMetricsTest.java |  24 +++
 .../processor/internals/StreamTaskTest.java|  49 -
 .../processor/internals/StreamThreadTest.java  |  24 ++-
 .../internals/metrics/ThreadMetricsTest.java   | 230 +
 16 files changed, 418 insertions(+), 170 deletions(-)