kafka git commit: KAFKA-5380: Fix transient failure in KafkaConsumerTest, close consumers
Repository: kafka Updated Branches: refs/heads/0.11.0 3d0f0caf7 -> 2f5b652a3 KAFKA-5380: Fix transient failure in KafkaConsumerTest, close consumers 1. Fix ordering of metadata update request for regex subscription to avoid timing issue when heartbeat thread updates metadata 2. Override metadata cluster in MockClient for `KafkaConsumer#testChangingRegexSubscription` to avoid timing issues during update 3. Close consumer in all KafkaConsumer tests since they leave behind heartbeat threads. Author: Rajini SivaramReviewers: Jason Gustafson , Ismael Juma Closes #3238 from rajinisivaram/KAFKA-5380 (cherry picked from commit bb914a0445549b41fb0b667ea2f5f15a90060133) Signed-off-by: Rajini Sivaram Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2f5b652a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2f5b652a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2f5b652a Branch: refs/heads/0.11.0 Commit: 2f5b652a3531b2f5c4e570d25359269a750c90b7 Parents: 3d0f0ca Author: Rajini Sivaram Authored: Wed Jun 7 12:51:53 2017 +0100 Committer: Rajini Sivaram Committed: Wed Jun 7 12:52:28 2017 +0100 -- .../apache/kafka/clients/consumer/KafkaConsumer.java | 2 +- .../java/org/apache/kafka/clients/MockClient.java| 7 +++ .../kafka/clients/consumer/KafkaConsumerTest.java| 15 +++ 3 files changed, 19 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/2f5b652a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 055712e..1d5ff98 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -903,8 +903,8 @@ public class KafkaConsumer implements Consumer { log.debug("Subscribed to pattern: {}", pattern); this.subscriptions.subscribe(pattern, listener); this.metadata.needMetadataForAllTopics(true); -this.metadata.requestUpdate(); this.coordinator.updatePatternSubscription(metadata.fetch()); +this.metadata.requestUpdate(); } finally { release(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/2f5b652a/clients/src/test/java/org/apache/kafka/clients/MockClient.java -- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index ce3c599..3a5adee 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -65,6 +65,7 @@ public class MockClient implements KafkaClient { private final Time time; private final Metadata metadata; private Set unavailableTopics; +private Cluster cluster; private Node node = null; private final Set ready = new HashSet<>(); private final Map blackedOut = new HashMap<>(); @@ -170,6 +171,8 @@ public class MockClient implements KafkaClient { if (metadata != null && metadata.updateRequested()) { MetadataUpdate metadataUpdate = metadataUpdates.poll(); +if (cluster != null) +metadata.update(cluster, this.unavailableTopics, time.milliseconds()); if (metadataUpdate == null) metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds()); else { @@ -303,6 +306,10 @@ public class MockClient implements KafkaClient { this.node = node; } +public void cluster(Cluster cluster) { +this.cluster = cluster; +} + @Override public int inFlightRequestCount() { return requests.size(); http://git-wip-us.apache.org/repos/asf/kafka/blob/2f5b652a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java -- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 94979e7..45fccb7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++
kafka git commit: KAFKA-5380: Fix transient failure in KafkaConsumerTest, close consumers
Repository: kafka Updated Branches: refs/heads/trunk b01c83ea9 -> bb914a044 KAFKA-5380: Fix transient failure in KafkaConsumerTest, close consumers 1. Fix ordering of metadata update request for regex subscription to avoid timing issue when heartbeat thread updates metadata 2. Override metadata cluster in MockClient for `KafkaConsumer#testChangingRegexSubscription` to avoid timing issues during update 3. Close consumer in all KafkaConsumer tests since they leave behind heartbeat threads. Author: Rajini SivaramReviewers: Jason Gustafson , Ismael Juma Closes #3238 from rajinisivaram/KAFKA-5380 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bb914a04 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bb914a04 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bb914a04 Branch: refs/heads/trunk Commit: bb914a0445549b41fb0b667ea2f5f15a90060133 Parents: b01c83e Author: Rajini Sivaram Authored: Wed Jun 7 12:51:53 2017 +0100 Committer: Rajini Sivaram Committed: Wed Jun 7 12:51:53 2017 +0100 -- .../apache/kafka/clients/consumer/KafkaConsumer.java | 2 +- .../java/org/apache/kafka/clients/MockClient.java| 7 +++ .../kafka/clients/consumer/KafkaConsumerTest.java| 15 +++ 3 files changed, 19 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/bb914a04/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 055712e..1d5ff98 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -903,8 +903,8 @@ public class KafkaConsumer implements Consumer { log.debug("Subscribed to pattern: {}", pattern); this.subscriptions.subscribe(pattern, listener); this.metadata.needMetadataForAllTopics(true); -this.metadata.requestUpdate(); this.coordinator.updatePatternSubscription(metadata.fetch()); +this.metadata.requestUpdate(); } finally { release(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/bb914a04/clients/src/test/java/org/apache/kafka/clients/MockClient.java -- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index ce3c599..3a5adee 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -65,6 +65,7 @@ public class MockClient implements KafkaClient { private final Time time; private final Metadata metadata; private Set unavailableTopics; +private Cluster cluster; private Node node = null; private final Set ready = new HashSet<>(); private final Map blackedOut = new HashMap<>(); @@ -170,6 +171,8 @@ public class MockClient implements KafkaClient { if (metadata != null && metadata.updateRequested()) { MetadataUpdate metadataUpdate = metadataUpdates.poll(); +if (cluster != null) +metadata.update(cluster, this.unavailableTopics, time.milliseconds()); if (metadataUpdate == null) metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds()); else { @@ -303,6 +306,10 @@ public class MockClient implements KafkaClient { this.node = node; } +public void cluster(Cluster cluster) { +this.cluster = cluster; +} + @Override public int inFlightRequestCount() { return requests.size(); http://git-wip-us.apache.org/repos/asf/kafka/blob/bb914a04/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java -- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 94979e7..45fccb7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -396,6 +396,7 @@ public class KafkaConsumerTest { consumer.poll(0);