This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 6e043af MINOR: Fix timing issue in advertised listener update test (#5256) 6e043af is described below commit 6e043af1449264efd6a5b3638011134cc6b85888 Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Mon Jun 25 10:52:31 2018 +0100 MINOR: Fix timing issue in advertised listener update test (#5256) Wait for produce to fail before updating listener to avoid send succeeding after the listener update. Also use different topics in tests with connection failures where one is expected to fail and the other is expected to succeed. Reviewers: Ismael Juma <ism...@juma.me.uk> --- .../server/DynamicBrokerReconfigurationTest.scala | 28 +++++++++++++++------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 96cca23..38b9f45 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -733,17 +733,25 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet // Verify that producer connections fail since advertised listener is invalid val bootstrap = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal)) .replaceAll(invalidHost, "localhost") // allow bootstrap connection to succeed - val producer1 = ProducerBuilder().trustStoreProps(sslProperties1).maxRetries(0).bootstrapServers(bootstrap).build() + val producer1 = ProducerBuilder().trustStoreProps(sslProperties1) + .maxRetries(0) + .requestTimeoutMs(1000) + .bootstrapServers(bootstrap) + .build() - val sendFuture = verifyConnectionFailure(producer1) + assertTrue(intercept[ExecutionException] { + producer1.send(new ProducerRecord(topic, "key", "value")).get(2, TimeUnit.SECONDS) + }.getCause.isInstanceOf[org.apache.kafka.common.errors.TimeoutException]) alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost") servers.foreach(validateEndpointsInZooKeeper(_, endpoints => !endpoints.contains(invalidHost))) // Verify that produce/consume work now + val topic2 = "testtopic2" + TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = numServers, servers) val producer = ProducerBuilder().trustStoreProps(sslProperties1).maxRetries(0).build() - val consumer = ConsumerBuilder("group2").trustStoreProps(sslProperties1).topic(topic).build() - verifyProduceConsume(producer, consumer, 10, topic) + val consumer = ConsumerBuilder("group2").trustStoreProps(sslProperties1).topic(topic2).build() + verifyProduceConsume(producer, consumer, 10, topic2) // Verify updating inter-broker listener val props = new Properties @@ -756,9 +764,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet assertTrue(s"Unexpected exception ${e.getCause}", e.getCause.isInstanceOf[InvalidRequestException]) servers.foreach(server => assertEquals(SecureInternal, server.config.interBrokerListenerName.value)) } - - // Verify that the other send did not complete - verifyTimeout(sendFuture) } @Test @@ -938,13 +943,15 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet val consumerFuture = verifyConnectionFailure(consumer1) // Test that other listeners still work + val topic2 = "testtopic2" + TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = numServers, servers) val producer2 = ProducerBuilder().trustStoreProps(sslProperties1).maxRetries(0).build() val consumer2 = ConsumerBuilder(s"remove-listener-group2-$securityProtocol") .trustStoreProps(sslProperties1) - .topic(topic) + .topic(topic2) .autoOffsetReset("latest") .build() - verifyProduceConsume(producer2, consumer2, numRecords = 10, topic) + verifyProduceConsume(producer2, consumer2, numRecords = 10, topic2) // Verify that producer/consumer using old listener don't work verifyTimeout(producerFuture) @@ -1361,13 +1368,16 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet private case class ProducerBuilder() extends ClientBuilder[KafkaProducer[String, String]] { private var _retries = 0 private var _acks = -1 + private var _requestTimeoutMs = 30000L def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this } def acks(acks: Int): ProducerBuilder = { _acks = acks; this } + def requestTimeoutMs(timeoutMs: Long): ProducerBuilder = { _requestTimeoutMs = timeoutMs; this } override def build(): KafkaProducer[String, String] = { val producer = TestUtils.createProducer(bootstrapServers, acks = _acks, + requestTimeoutMs = _requestTimeoutMs, retries = _retries, securityProtocol = _securityProtocol, trustStoreFile = Some(trustStoreFile1),