This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f3a3253  HOTFIX: Fix reset integration test hangs on busy wait (#4491)
f3a3253 is described below

commit f3a3253e24d63cbfcbd42c8a550a3f898c734044
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Wed Feb 14 08:49:23 2018 -0800

    HOTFIX: Fix reset integration test hangs on busy wait (#4491)
    
    * do not use static properties
    * use new object to take appID
    * capture timeout exception inside condition
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>
---
 .../kafka/common/network/SslTransportLayer.java    |  10 +-
 core/src/main/scala/kafka/admin/AdminClient.scala  |  10 +-
 .../integration/AbstractResetIntegrationTest.java  | 208 +++++++++++----------
 .../streams/integration/ResetIntegrationTest.java  |   8 +-
 .../integration/ResetIntegrationWithSslTest.java   |  10 +-
 .../integration/utils/EmbeddedKafkaCluster.java    |   2 +-
 streams/src/test/resources/log4j.properties        |   2 +-
 7 files changed, 137 insertions(+), 113 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 49f1d66..704a198 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -36,16 +36,14 @@ import javax.net.ssl.SSLPeerUnverifiedException;
 
 import org.apache.kafka.common.errors.SslAuthenticationException;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /*
  * Transport layer for SSL communication
  */
 public class SslTransportLayer implements TransportLayer {
-    private static final Logger log = 
LoggerFactory.getLogger(SslTransportLayer.class);
-
     private enum State {
         HANDSHAKE,
         HANDSHAKE_FAILED,
@@ -57,6 +55,7 @@ public class SslTransportLayer implements TransportLayer {
     private final SSLEngine sslEngine;
     private final SelectionKey key;
     private final SocketChannel socketChannel;
+    private final Logger log;
 
     private HandshakeStatus handshakeStatus;
     private SSLEngineResult handshakeResult;
@@ -79,6 +78,9 @@ public class SslTransportLayer implements TransportLayer {
         this.key = key;
         this.socketChannel = (SocketChannel) key.channel();
         this.sslEngine = sslEngine;
+
+        final LogContext logContext = new 
LogContext(String.format("[SslTransportLayer channelId=%s key=%s] ", channelId, 
key));
+        this.log = logContext.logger(getClass());
     }
 
     // Visible for testing
@@ -172,7 +174,7 @@ public class SslTransportLayer implements TransportLayer {
                 flush(netWriteBuffer);
             }
         } catch (IOException ie) {
-            log.warn("Failed to send SSL Close message ", ie);
+            log.warn("Failed to send SSL Close message", ie);
         } finally {
             socketChannel.socket().close();
             socketChannel.close();
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala 
b/core/src/main/scala/kafka/admin/AdminClient.scala
index 772277f..c010ba0 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -528,18 +528,20 @@ object AdminClient {
     val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
     metadata.update(bootstrapCluster, Collections.emptySet(), 0)
 
+    val clientId = "admin-" + AdminClientIdSequence.getAndIncrement()
+
     val selector = new Selector(
       DefaultConnectionMaxIdleMs,
       metrics,
       time,
       "admin",
       channelBuilder,
-      new LogContext())
+      new LogContext(String.format("[Producer clientId=%s] ", clientId)))
 
     val networkClient = new NetworkClient(
       selector,
       metadata,
-      "admin-" + AdminClientIdSequence.getAndIncrement(),
+      clientId,
       DefaultMaxInFlightRequestsPerConnection,
       DefaultReconnectBackoffMs,
       DefaultReconnectBackoffMax,
@@ -549,10 +551,10 @@ object AdminClient {
       time,
       true,
       new ApiVersions,
-      new LogContext())
+      new LogContext(String.format("[NetworkClient clientId=%s] ", clientId)))
 
     val highLevelClient = new ConsumerNetworkClient(
-      new LogContext(),
+      new LogContext(String.format("[ConsumerNetworkClient clientId=%s] ", 
clientId)),
       networkClient,
       metadata,
       time,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 05ccd63..758c4f6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.serialization.LongDeserializer;
@@ -47,7 +48,6 @@ import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
@@ -68,17 +68,18 @@ import java.util.concurrent.TimeUnit;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-@Ignore
 @Category({IntegrationTest.class})
 public abstract class AbstractResetIntegrationTest {
     static String testId;
     static EmbeddedKafkaCluster cluster;
-    static Map<String, Object> sslConfig = null;
-    private static KafkaStreams streams;
+
     private static MockTime mockTime;
+    private static KafkaStreams streams;
     private static AdminClient adminClient = null;
     private static KafkaAdminClient kafkaAdminClient = null;
 
+    abstract Map<String, Object> getClientSslConfig();
+
     @AfterClass
     public static void afterClassCleanup() {
         if (adminClient != null) {
@@ -91,15 +92,18 @@ public abstract class AbstractResetIntegrationTest {
         }
     }
 
-    private String appID;
+    private String appID = "abstract-reset-integration-test";
     private Properties commonClientConfig;
+    private Properties streamsConfig;
+    private Properties producerConfig;
+    private Properties resultConsumerConfig;
 
     private void prepareEnvironment() {
         if (adminClient == null) {
             adminClient = AdminClient.create(commonClientConfig);
         }
         if (kafkaAdminClient == null) {
-            kafkaAdminClient =  (KafkaAdminClient) 
org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
+            kafkaAdminClient = (KafkaAdminClient) 
org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
         }
 
         // we align time to seconds to get clean window boundaries and thus 
ensure the same result for each run
@@ -113,34 +117,38 @@ public abstract class AbstractResetIntegrationTest {
         commonClientConfig = new Properties();
         commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
 
+        Map<String, Object> sslConfig = getClientSslConfig();
         if (sslConfig != null) {
             commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
             commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
             
commonClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
         }
 
-        PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
-        PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
-        PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
LongSerializer.class);
-        PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
-        PRODUCER_CONFIG.putAll(commonClientConfig);
-
-        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, testId + 
"-result-consumer");
-        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
-        
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
-        RESULT_CONSUMER_CONFIG.putAll(commonClientConfig);
-
-        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
-        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
-        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        STREAMS_CONFIG.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
-        STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
STREAMS_CONSUMER_TIMEOUT);
-        STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
-        STREAMS_CONFIG.putAll(commonClientConfig);
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testId + 
"-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
+        
resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
STREAMS_CONSUMER_TIMEOUT);
+        streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
+        streamsConfig.putAll(commonClientConfig);
     }
 
     @Rule
@@ -157,24 +165,24 @@ public abstract class AbstractResetIntegrationTest {
     private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
     private static final int TIMEOUT_MULTIPLIER = 5;
 
-    private final TestCondition consumerGroupInactiveCondition = new 
TestCondition() {
+    private class ConsumerGroupInactiveCondition implements TestCondition {
         @Override
         public boolean conditionMet() {
-            return adminClient.describeConsumerGroup(testId + 
"-result-consumer", 0).consumers().get().isEmpty();
+            try {
+                return adminClient.describeConsumerGroup(appID, 
0).consumers().get().isEmpty();
+            } catch (final TimeoutException e) {
+                return false;
+            }
         }
-    };
-
-    private static final Properties STREAMS_CONFIG = new Properties();
-    private final static Properties PRODUCER_CONFIG = new Properties();
-    private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
+    }
 
     void prepareTest() throws Exception {
         prepareConfigs();
         prepareEnvironment();
 
         // busy wait until cluster (ie, ConsumerGroupCoordinator) is available
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
-                "Test consumer group active even after waiting " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                "Test consumer group " + appID + " still active even after 
waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, 
OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
 
@@ -185,7 +193,7 @@ public abstract class AbstractResetIntegrationTest {
         if (streams != null) {
             streams.close(30, TimeUnit.SECONDS);
         }
-        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
     }
 
     private void add10InputElements() throws 
java.util.concurrent.ExecutionException, InterruptedException {
@@ -202,7 +210,7 @@ public abstract class AbstractResetIntegrationTest {
 
         for (KeyValue<Long, String> record : records) {
             mockTime.sleep(10);
-            
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(record), PRODUCER_CONFIG, mockTime.milliseconds());
+            
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(record), producerConfig, mockTime.milliseconds());
         }
     }
 
@@ -216,10 +224,10 @@ public abstract class AbstractResetIntegrationTest {
         cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
CLEANUP_CONSUMER_TIMEOUT);
 
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
-        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
         streams.start();
 
         final int exitCode = new StreamsResetter().run(parameters, 
cleanUpConfig);
@@ -258,35 +266,35 @@ public abstract class AbstractResetIntegrationTest {
 
     void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() 
throws Exception {
         appID = testId + "-from-scratch";
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
-        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
         streams.start();
-        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG,
 OUTPUT_TOPIC, 10);
+        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
-            "Streams Application consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group " + appID + " did not time out 
after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
-        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
         streams.cleanUp();
         cleanGlobal(false, null, null);
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group " + appID + " did not time out 
after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         assertInternalTopicsGotDeleted(null);
 
         // RE-RUN
         streams.start();
-        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG,
 OUTPUT_TOPIC, 10);
+        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
         streams.close();
 
         assertThat(resultRerun, equalTo(result));
 
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group " + appID + " did not time out 
after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
         cleanGlobal(false, null, null);
     }
 
@@ -294,19 +302,19 @@ public abstract class AbstractResetIntegrationTest {
         cluster.createTopic(INTERMEDIATE_USER_TOPIC);
 
         appID = testId + "-from-scratch-with-intermediate-topic";
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
-        streams = new 
KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), 
STREAMS_CONFIG);
+        streams = new 
KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), 
streamsConfig);
         streams.start();
-        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG,
 OUTPUT_TOPIC, 10);
+        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
         // receive only first values to make sure intermediate user topic is 
not consumed completely
         // => required to test "seekToEnd" for intermediate topics
-        final List<KeyValue<Long, Long>> result2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG,
 OUTPUT_TOPIC_2, 40);
+        final List<KeyValue<Long, Long>> result2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC_2, 40);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
-            "Streams Application consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group " + appID + " did not time out 
after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // insert bad record to make sure intermediate user topic gets 
seekToEnd()
         mockTime.sleep(1);
@@ -314,22 +322,22 @@ public abstract class AbstractResetIntegrationTest {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             INTERMEDIATE_USER_TOPIC,
             Collections.singleton(badMessage),
-            PRODUCER_CONFIG,
+                producerConfig,
             mockTime.milliseconds());
 
         // RESET
-        streams = new 
KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), 
STREAMS_CONFIG);
+        streams = new 
KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), 
streamsConfig);
         streams.cleanUp();
         cleanGlobal(true, null, null);
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group " + appID + " did not time out 
after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
 
         // RE-RUN
         streams.start();
-        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG,
 OUTPUT_TOPIC, 10);
-        final List<KeyValue<Long, Long>> resultRerun2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG,
 OUTPUT_TOPIC_2_RERUN, 40);
+        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
+        final List<KeyValue<Long, Long>> resultRerun2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC_2_RERUN, 40);
         streams.close();
 
         assertThat(resultRerun, equalTo(result));
@@ -343,8 +351,8 @@ public abstract class AbstractResetIntegrationTest {
         }
         assertThat(resultIntermediate.get(10), equalTo(badMessage));
 
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group " + appID + " did not time out 
after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
         cleanGlobal(true, null, null);
 
         cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
@@ -352,16 +360,16 @@ public abstract class AbstractResetIntegrationTest {
 
     void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() 
throws Exception {
         appID = testId + "-from-file";
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
-        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
         streams.start();
-        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG,
 OUTPUT_TOPIC, 10);
+        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
-            "Streams Application consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group " + appID + " did not time out 
after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
         final File resetFile = File.createTempFile("reset", ".csv");
@@ -370,12 +378,12 @@ public abstract class AbstractResetIntegrationTest {
             writer.close();
         }
 
-        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
         streams.cleanUp();
 
         cleanGlobal(false, "--from-file", resetFile.getAbsolutePath());
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group " + appID + " did not time out 
after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         assertInternalTopicsGotDeleted(null);
 
@@ -383,29 +391,29 @@ public abstract class AbstractResetIntegrationTest {
 
         // RE-RUN
         streams.start();
-        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG,
 OUTPUT_TOPIC, 5);
+        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 5);
         streams.close();
 
         result.remove(0);
         assertThat(resultRerun, equalTo(result));
 
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group " + appID + " did not time out 
after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
         cleanGlobal(false, null, null);
     }
 
     void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() 
throws Exception {
         appID = testId + "-from-datetime";
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
-        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
         streams.start();
-        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG,
 OUTPUT_TOPIC, 10);
+        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
-            "Streams Application consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group " + appID + " did not time out 
after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
         final File resetFile = File.createTempFile("reset", ".csv");
@@ -414,7 +422,7 @@ public abstract class AbstractResetIntegrationTest {
             writer.close();
         }
 
-        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
         streams.cleanUp();
 
 
@@ -423,8 +431,8 @@ public abstract class AbstractResetIntegrationTest {
         calendar.add(Calendar.DATE, -1);
 
         cleanGlobal(false, "--to-datetime", format.format(calendar.getTime()));
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group " + appID + " did not time out 
after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         assertInternalTopicsGotDeleted(null);
 
@@ -432,28 +440,28 @@ public abstract class AbstractResetIntegrationTest {
 
         // RE-RUN
         streams.start();
-        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG,
 OUTPUT_TOPIC, 10);
+        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
         streams.close();
 
         assertThat(resultRerun, equalTo(result));
 
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group " + appID + " did not time out 
after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
         cleanGlobal(false, null, null);
     }
 
     void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() 
throws Exception {
         appID = testId + "-from-duration";
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
-        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
         streams.start();
-        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG,
 OUTPUT_TOPIC, 10);
+        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
-            "Streams Application consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group " + appID + "  did not time 
out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
         final File resetFile = File.createTempFile("reset", ".csv");
@@ -462,12 +470,12 @@ public abstract class AbstractResetIntegrationTest {
             writer.close();
         }
 
-        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
         streams.cleanUp();
         cleanGlobal(false, "--by-duration", "PT1M");
 
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group " + appID + " did not time out after " 
+ (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         assertInternalTopicsGotDeleted(null);
 
@@ -475,13 +483,13 @@ public abstract class AbstractResetIntegrationTest {
 
         // RE-RUN
         streams.start();
-        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG,
 OUTPUT_TOPIC, 10);
+        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
         streams.close();
 
         assertThat(resultRerun, equalTo(result));
 
-        TestUtils.waitForCondition(consumerGroupInactiveCondition, 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group " + appID + " did not time out after " 
+ (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
         cleanGlobal(false, null, null);
     }
 
@@ -546,6 +554,8 @@ public abstract class AbstractResetIntegrationTest {
             parameterList.add("--intermediate-topics");
             parameterList.add(INTERMEDIATE_USER_TOPIC);
         }
+
+        final Map<String, Object> sslConfig = getClientSslConfig();
         if (sslConfig != null) {
             final File configFile = TestUtils.tempFile();
             final BufferedWriter writer = new BufferedWriter(new 
FileWriter(configFile));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 1daeca9..ed04710 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -23,17 +23,16 @@ import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.Map;
 import java.util.Properties;
 
 
 /**
  * Tests local state store and global application cleanup.
  */
-@Ignore
 @Category({IntegrationTest.class})
 public class ResetIntegrationTest extends AbstractResetIntegrationTest {
 
@@ -51,6 +50,11 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
         CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
     }
 
+    @Override
+    Map<String, Object> getClientSslConfig() {
+        return null;
+    }
+
     @Before
     public void before() throws Exception {
         testId = TEST_ID;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
index 4bc5454..b66e042 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
@@ -25,16 +25,15 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.Map;
 import java.util.Properties;
 
 /**
  * Tests command line SSL setup for reset tool.
  */
-@Ignore
 @Category({IntegrationTest.class})
 public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
 
@@ -43,6 +42,8 @@ public class ResetIntegrationWithSslTest extends 
AbstractResetIntegrationTest {
 
     private static final String TEST_ID = "reset-with-ssl-integration-test";
 
+    private static Map<String, Object> sslConfig;
+
     static {
         final Properties brokerProps = new Properties();
         // we double the value passed to `time.sleep` in each iteration in one 
of the map functions, so we disable
@@ -63,6 +64,11 @@ public class ResetIntegrationWithSslTest extends 
AbstractResetIntegrationTest {
         CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
     }
 
+    @Override
+    Map<String, Object> getClientSslConfig() {
+        return sslConfig;
+    }
+
     @Before
     public void before() throws Exception {
         testId = TEST_ID;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 367e489..c33a720 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -104,7 +104,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
 
         for (int i = 0; i < brokers.length; i++) {
             brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
-            log.debug("Starting a Kafka instance on port {} ...", 
brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
+            log.debug("Starting a Kafka instance on port {} ...", 
brokerConfig.get(KafkaConfig$.MODULE$.PortProp()));
             brokers[i] = new KafkaEmbedded(brokerConfig, time);
 
             log.debug("Kafka instance is running at {}, connected to ZooKeeper 
at {}",
diff --git a/streams/src/test/resources/log4j.properties 
b/streams/src/test/resources/log4j.properties
index 91c909b..be36f90 100644
--- a/streams/src/test/resources/log4j.properties
+++ b/streams/src/test/resources/log4j.properties
@@ -18,4 +18,4 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.org.apache.kafka=INFO
\ No newline at end of file
+log4j.logger.org.apache.kafka=INFO

-- 
To stop receiving notification emails like this one, please contact
guozh...@apache.org.

Reply via email to