This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f3a3253 HOTFIX: Fix reset integration test hangs on busy wait (#4491) f3a3253 is described below commit f3a3253e24d63cbfcbd42c8a550a3f898c734044 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Wed Feb 14 08:49:23 2018 -0800 HOTFIX: Fix reset integration test hangs on busy wait (#4491) * do not use static properties * use new object to take appID * capture timeout exception inside condition Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../kafka/common/network/SslTransportLayer.java | 10 +- core/src/main/scala/kafka/admin/AdminClient.scala | 10 +- .../integration/AbstractResetIntegrationTest.java | 208 +++++++++++---------- .../streams/integration/ResetIntegrationTest.java | 8 +- .../integration/ResetIntegrationWithSslTest.java | 10 +- .../integration/utils/EmbeddedKafkaCluster.java | 2 +- streams/src/test/resources/log4j.properties | 2 +- 7 files changed, 137 insertions(+), 113 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 49f1d66..704a198 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -36,16 +36,14 @@ import javax.net.ssl.SSLPeerUnverifiedException; import org.apache.kafka.common.errors.SslAuthenticationException; import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /* * Transport layer for SSL communication */ public class SslTransportLayer implements TransportLayer { - private static final Logger log = LoggerFactory.getLogger(SslTransportLayer.class); - private enum State { HANDSHAKE, HANDSHAKE_FAILED, @@ -57,6 +55,7 @@ public class SslTransportLayer implements TransportLayer { private final SSLEngine sslEngine; private final SelectionKey key; private final SocketChannel socketChannel; + private final Logger log; private HandshakeStatus handshakeStatus; private SSLEngineResult handshakeResult; @@ -79,6 +78,9 @@ public class SslTransportLayer implements TransportLayer { this.key = key; this.socketChannel = (SocketChannel) key.channel(); this.sslEngine = sslEngine; + + final LogContext logContext = new LogContext(String.format("[SslTransportLayer channelId=%s key=%s] ", channelId, key)); + this.log = logContext.logger(getClass()); } // Visible for testing @@ -172,7 +174,7 @@ public class SslTransportLayer implements TransportLayer { flush(netWriteBuffer); } } catch (IOException ie) { - log.warn("Failed to send SSL Close message ", ie); + log.warn("Failed to send SSL Close message", ie); } finally { socketChannel.socket().close(); socketChannel.close(); diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 772277f..c010ba0 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -528,18 +528,20 @@ object AdminClient { val bootstrapCluster = Cluster.bootstrap(brokerAddresses) metadata.update(bootstrapCluster, Collections.emptySet(), 0) + val clientId = "admin-" + AdminClientIdSequence.getAndIncrement() + val selector = new Selector( DefaultConnectionMaxIdleMs, metrics, time, "admin", channelBuilder, - new LogContext()) + new LogContext(String.format("[Producer clientId=%s] ", clientId))) val networkClient = new NetworkClient( selector, metadata, - "admin-" + AdminClientIdSequence.getAndIncrement(), + clientId, DefaultMaxInFlightRequestsPerConnection, DefaultReconnectBackoffMs, DefaultReconnectBackoffMax, @@ -549,10 +551,10 @@ object AdminClient { time, true, new ApiVersions, - new LogContext()) + new LogContext(String.format("[NetworkClient clientId=%s] ", clientId))) val highLevelClient = new ConsumerNetworkClient( - new LogContext(), + new LogContext(String.format("[ConsumerNetworkClient clientId=%s] ", clientId)), networkClient, metadata, time, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index 05ccd63..758c4f6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.serialization.LongDeserializer; @@ -47,7 +48,6 @@ import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Rule; import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; @@ -68,17 +68,18 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -@Ignore @Category({IntegrationTest.class}) public abstract class AbstractResetIntegrationTest { static String testId; static EmbeddedKafkaCluster cluster; - static Map<String, Object> sslConfig = null; - private static KafkaStreams streams; + private static MockTime mockTime; + private static KafkaStreams streams; private static AdminClient adminClient = null; private static KafkaAdminClient kafkaAdminClient = null; + abstract Map<String, Object> getClientSslConfig(); + @AfterClass public static void afterClassCleanup() { if (adminClient != null) { @@ -91,15 +92,18 @@ public abstract class AbstractResetIntegrationTest { } } - private String appID; + private String appID = "abstract-reset-integration-test"; private Properties commonClientConfig; + private Properties streamsConfig; + private Properties producerConfig; + private Properties resultConsumerConfig; private void prepareEnvironment() { if (adminClient == null) { adminClient = AdminClient.create(commonClientConfig); } if (kafkaAdminClient == null) { - kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig); + kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig); } // we align time to seconds to get clean window boundaries and thus ensure the same result for each run @@ -113,34 +117,38 @@ public abstract class AbstractResetIntegrationTest { commonClientConfig = new Properties(); commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + Map<String, Object> sslConfig = getClientSslConfig(); if (sslConfig != null) { commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value()); commonClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); } - PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); - PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); - PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - PRODUCER_CONFIG.putAll(commonClientConfig); - - RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, testId + "-result-consumer"); - RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - RESULT_CONSUMER_CONFIG.putAll(commonClientConfig); - - STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); - STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); - STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - STREAMS_CONFIG.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); - STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); - STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); - STREAMS_CONFIG.putAll(commonClientConfig); + producerConfig = new Properties(); + producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); + producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerConfig.putAll(commonClientConfig); + + resultConsumerConfig = new Properties(); + resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testId + "-result-consumer"); + resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + resultConsumerConfig.putAll(commonClientConfig); + + streamsConfig = new Properties(); + streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); + streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); + streamsConfig.putAll(commonClientConfig); } @Rule @@ -157,24 +165,24 @@ public abstract class AbstractResetIntegrationTest { private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L; private static final int TIMEOUT_MULTIPLIER = 5; - private final TestCondition consumerGroupInactiveCondition = new TestCondition() { + private class ConsumerGroupInactiveCondition implements TestCondition { @Override public boolean conditionMet() { - return adminClient.describeConsumerGroup(testId + "-result-consumer", 0).consumers().get().isEmpty(); + try { + return adminClient.describeConsumerGroup(appID, 0).consumers().get().isEmpty(); + } catch (final TimeoutException e) { + return false; + } } - }; - - private static final Properties STREAMS_CONFIG = new Properties(); - private final static Properties PRODUCER_CONFIG = new Properties(); - private final static Properties RESULT_CONSUMER_CONFIG = new Properties(); + } void prepareTest() throws Exception { prepareConfigs(); prepareEnvironment(); // busy wait until cluster (ie, ConsumerGroupCoordinator) is available - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Test consumer group " + appID + " still active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN); @@ -185,7 +193,7 @@ public abstract class AbstractResetIntegrationTest { if (streams != null) { streams.close(30, TimeUnit.SECONDS); } - IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); + IntegrationTestUtils.purgeLocalStreamsState(streamsConfig); } private void add10InputElements() throws java.util.concurrent.ExecutionException, InterruptedException { @@ -202,7 +210,7 @@ public abstract class AbstractResetIntegrationTest { for (KeyValue<Long, String> record : records) { mockTime.sleep(10); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), PRODUCER_CONFIG, mockTime.milliseconds()); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), producerConfig, mockTime.milliseconds()); } } @@ -216,10 +224,10 @@ public abstract class AbstractResetIntegrationTest { cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); streams.start(); final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); @@ -258,35 +266,35 @@ public abstract class AbstractResetIntegrationTest { void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { appID = testId + "-from-scratch"; - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); streams.start(); - final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); // RESET - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); streams.cleanUp(); cleanGlobal(false, null, null); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); assertInternalTopicsGotDeleted(null); // RE-RUN streams.start(); - final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); assertThat(resultRerun, equalTo(result)); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); cleanGlobal(false, null, null); } @@ -294,19 +302,19 @@ public abstract class AbstractResetIntegrationTest { cluster.createTopic(INTERMEDIATE_USER_TOPIC); appID = testId + "-from-scratch-with-intermediate-topic"; - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN - streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), STREAMS_CONFIG); + streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfig); streams.start(); - final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); // receive only first values to make sure intermediate user topic is not consumed completely // => required to test "seekToEnd" for intermediate topics - final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2, 40); + final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC_2, 40); streams.close(); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); // insert bad record to make sure intermediate user topic gets seekToEnd() mockTime.sleep(1); @@ -314,22 +322,22 @@ public abstract class AbstractResetIntegrationTest { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( INTERMEDIATE_USER_TOPIC, Collections.singleton(badMessage), - PRODUCER_CONFIG, + producerConfig, mockTime.milliseconds()); // RESET - streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), STREAMS_CONFIG); + streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfig); streams.cleanUp(); cleanGlobal(true, null, null); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC); // RE-RUN streams.start(); - final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); - final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2_RERUN, 40); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); + final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC_2_RERUN, 40); streams.close(); assertThat(resultRerun, equalTo(result)); @@ -343,8 +351,8 @@ public abstract class AbstractResetIntegrationTest { } assertThat(resultIntermediate.get(10), equalTo(badMessage)); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); cleanGlobal(true, null, null); cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC); @@ -352,16 +360,16 @@ public abstract class AbstractResetIntegrationTest { void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception { appID = testId + "-from-file"; - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); streams.start(); - final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); // RESET final File resetFile = File.createTempFile("reset", ".csv"); @@ -370,12 +378,12 @@ public abstract class AbstractResetIntegrationTest { writer.close(); } - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); streams.cleanUp(); cleanGlobal(false, "--from-file", resetFile.getAbsolutePath()); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); assertInternalTopicsGotDeleted(null); @@ -383,29 +391,29 @@ public abstract class AbstractResetIntegrationTest { // RE-RUN streams.start(); - final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 5); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 5); streams.close(); result.remove(0); assertThat(resultRerun, equalTo(result)); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); cleanGlobal(false, null, null); } void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception { appID = testId + "-from-datetime"; - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); streams.start(); - final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); // RESET final File resetFile = File.createTempFile("reset", ".csv"); @@ -414,7 +422,7 @@ public abstract class AbstractResetIntegrationTest { writer.close(); } - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); streams.cleanUp(); @@ -423,8 +431,8 @@ public abstract class AbstractResetIntegrationTest { calendar.add(Calendar.DATE, -1); cleanGlobal(false, "--to-datetime", format.format(calendar.getTime())); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); assertInternalTopicsGotDeleted(null); @@ -432,28 +440,28 @@ public abstract class AbstractResetIntegrationTest { // RE-RUN streams.start(); - final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); assertThat(resultRerun, equalTo(result)); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); cleanGlobal(false, null, null); } void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception { appID = testId + "-from-duration"; - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); streams.start(); - final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); // RESET final File resetFile = File.createTempFile("reset", ".csv"); @@ -462,12 +470,12 @@ public abstract class AbstractResetIntegrationTest { writer.close(); } - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); streams.cleanUp(); cleanGlobal(false, "--by-duration", "PT1M"); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); assertInternalTopicsGotDeleted(null); @@ -475,13 +483,13 @@ public abstract class AbstractResetIntegrationTest { // RE-RUN streams.start(); - final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); assertThat(resultRerun, equalTo(result)); - TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); cleanGlobal(false, null, null); } @@ -546,6 +554,8 @@ public abstract class AbstractResetIntegrationTest { parameterList.add("--intermediate-topics"); parameterList.add(INTERMEDIATE_USER_TOPIC); } + + final Map<String, Object> sslConfig = getClientSslConfig(); if (sslConfig != null) { final File configFile = TestUtils.tempFile(); final BufferedWriter writer = new BufferedWriter(new FileWriter(configFile)); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 1daeca9..ed04710 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -23,17 +23,16 @@ import org.apache.kafka.test.IntegrationTest; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.Map; import java.util.Properties; /** * Tests local state store and global application cleanup. */ -@Ignore @Category({IntegrationTest.class}) public class ResetIntegrationTest extends AbstractResetIntegrationTest { @@ -51,6 +50,11 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { CLUSTER = new EmbeddedKafkaCluster(1, brokerProps); } + @Override + Map<String, Object> getClientSslConfig() { + return null; + } + @Before public void before() throws Exception { testId = TEST_ID; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java index 4bc5454..b66e042 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java @@ -25,16 +25,15 @@ import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.Map; import java.util.Properties; /** * Tests command line SSL setup for reset tool. */ -@Ignore @Category({IntegrationTest.class}) public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest { @@ -43,6 +42,8 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest { private static final String TEST_ID = "reset-with-ssl-integration-test"; + private static Map<String, Object> sslConfig; + static { final Properties brokerProps = new Properties(); // we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable @@ -63,6 +64,11 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest { CLUSTER = new EmbeddedKafkaCluster(1, brokerProps); } + @Override + Map<String, Object> getClientSslConfig() { + return sslConfig; + } + @Before public void before() throws Exception { testId = TEST_ID; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 367e489..c33a720 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -104,7 +104,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { for (int i = 0; i < brokers.length; i++) { brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i); - log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp())); + log.debug("Starting a Kafka instance on port {} ...", brokerConfig.get(KafkaConfig$.MODULE$.PortProp())); brokers[i] = new KafkaEmbedded(brokerConfig, time); log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", diff --git a/streams/src/test/resources/log4j.properties b/streams/src/test/resources/log4j.properties index 91c909b..be36f90 100644 --- a/streams/src/test/resources/log4j.properties +++ b/streams/src/test/resources/log4j.properties @@ -18,4 +18,4 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.org.apache.kafka=INFO \ No newline at end of file +log4j.logger.org.apache.kafka=INFO -- To stop receiving notification emails like this one, please contact guozh...@apache.org.