kafka git commit: KAFKA-5380: Fix transient failure in KafkaConsumerTest, close consumers

2017-06-07 Thread rsivaram
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 3d0f0caf7 -> 2f5b652a3


KAFKA-5380: Fix transient failure in KafkaConsumerTest, close consumers

1. Fix ordering of metadata update request for regex subscription to avoid 
timing issue when heartbeat thread updates metadata
2. Override metadata cluster in MockClient for 
`KafkaConsumer#testChangingRegexSubscription` to avoid timing issues during 
update
3. Close consumer in all KafkaConsumer tests since they leave behind heartbeat 
threads.

Author: Rajini Sivaram 

Reviewers: Jason Gustafson , Ismael Juma 

Closes #3238 from rajinisivaram/KAFKA-5380

(cherry picked from commit bb914a0445549b41fb0b667ea2f5f15a90060133)
Signed-off-by: Rajini Sivaram 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2f5b652a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2f5b652a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2f5b652a

Branch: refs/heads/0.11.0
Commit: 2f5b652a3531b2f5c4e570d25359269a750c90b7
Parents: 3d0f0ca
Author: Rajini Sivaram 
Authored: Wed Jun 7 12:51:53 2017 +0100
Committer: Rajini Sivaram 
Committed: Wed Jun 7 12:52:28 2017 +0100

--
 .../apache/kafka/clients/consumer/KafkaConsumer.java |  2 +-
 .../java/org/apache/kafka/clients/MockClient.java|  7 +++
 .../kafka/clients/consumer/KafkaConsumerTest.java| 15 +++
 3 files changed, 19 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/2f5b652a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 055712e..1d5ff98 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -903,8 +903,8 @@ public class KafkaConsumer implements Consumer {
 log.debug("Subscribed to pattern: {}", pattern);
 this.subscriptions.subscribe(pattern, listener);
 this.metadata.needMetadataForAllTopics(true);
-this.metadata.requestUpdate();
 this.coordinator.updatePatternSubscription(metadata.fetch());
+this.metadata.requestUpdate();
 } finally {
 release();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f5b652a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
--
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index ce3c599..3a5adee 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -65,6 +65,7 @@ public class MockClient implements KafkaClient {
 private final Time time;
 private final Metadata metadata;
 private Set unavailableTopics;
+private Cluster cluster;
 private Node node = null;
 private final Set ready = new HashSet<>();
 private final Map blackedOut = new HashMap<>();
@@ -170,6 +171,8 @@ public class MockClient implements KafkaClient {
 
 if (metadata != null && metadata.updateRequested()) {
 MetadataUpdate metadataUpdate = metadataUpdates.poll();
+if (cluster != null)
+metadata.update(cluster, this.unavailableTopics, 
time.milliseconds());
 if (metadataUpdate == null)
 metadata.update(metadata.fetch(), this.unavailableTopics, 
time.milliseconds());
 else {
@@ -303,6 +306,10 @@ public class MockClient implements KafkaClient {
 this.node = node;
 }
 
+public void cluster(Cluster cluster) {
+this.cluster = cluster;
+}
+
 @Override
 public int inFlightRequestCount() {
 return requests.size();

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f5b652a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
--
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 94979e7..45fccb7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 

kafka git commit: KAFKA-5380: Fix transient failure in KafkaConsumerTest, close consumers

2017-06-07 Thread rsivaram
Repository: kafka
Updated Branches:
  refs/heads/trunk b01c83ea9 -> bb914a044


KAFKA-5380: Fix transient failure in KafkaConsumerTest, close consumers

1. Fix ordering of metadata update request for regex subscription to avoid 
timing issue when heartbeat thread updates metadata
2. Override metadata cluster in MockClient for 
`KafkaConsumer#testChangingRegexSubscription` to avoid timing issues during 
update
3. Close consumer in all KafkaConsumer tests since they leave behind heartbeat 
threads.

Author: Rajini Sivaram 

Reviewers: Jason Gustafson , Ismael Juma 

Closes #3238 from rajinisivaram/KAFKA-5380


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bb914a04
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bb914a04
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bb914a04

Branch: refs/heads/trunk
Commit: bb914a0445549b41fb0b667ea2f5f15a90060133
Parents: b01c83e
Author: Rajini Sivaram 
Authored: Wed Jun 7 12:51:53 2017 +0100
Committer: Rajini Sivaram 
Committed: Wed Jun 7 12:51:53 2017 +0100

--
 .../apache/kafka/clients/consumer/KafkaConsumer.java |  2 +-
 .../java/org/apache/kafka/clients/MockClient.java|  7 +++
 .../kafka/clients/consumer/KafkaConsumerTest.java| 15 +++
 3 files changed, 19 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/bb914a04/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 055712e..1d5ff98 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -903,8 +903,8 @@ public class KafkaConsumer implements Consumer {
 log.debug("Subscribed to pattern: {}", pattern);
 this.subscriptions.subscribe(pattern, listener);
 this.metadata.needMetadataForAllTopics(true);
-this.metadata.requestUpdate();
 this.coordinator.updatePatternSubscription(metadata.fetch());
+this.metadata.requestUpdate();
 } finally {
 release();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb914a04/clients/src/test/java/org/apache/kafka/clients/MockClient.java
--
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index ce3c599..3a5adee 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -65,6 +65,7 @@ public class MockClient implements KafkaClient {
 private final Time time;
 private final Metadata metadata;
 private Set unavailableTopics;
+private Cluster cluster;
 private Node node = null;
 private final Set ready = new HashSet<>();
 private final Map blackedOut = new HashMap<>();
@@ -170,6 +171,8 @@ public class MockClient implements KafkaClient {
 
 if (metadata != null && metadata.updateRequested()) {
 MetadataUpdate metadataUpdate = metadataUpdates.poll();
+if (cluster != null)
+metadata.update(cluster, this.unavailableTopics, 
time.milliseconds());
 if (metadataUpdate == null)
 metadata.update(metadata.fetch(), this.unavailableTopics, 
time.milliseconds());
 else {
@@ -303,6 +306,10 @@ public class MockClient implements KafkaClient {
 this.node = node;
 }
 
+public void cluster(Cluster cluster) {
+this.cluster = cluster;
+}
+
 @Override
 public int inFlightRequestCount() {
 return requests.size();

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb914a04/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
--
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 94979e7..45fccb7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -396,6 +396,7 @@ public class KafkaConsumerTest {
 consumer.poll(0);