This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6e043af  MINOR: Fix timing issue in advertised listener update test 
(#5256)
6e043af is described below

commit 6e043af1449264efd6a5b3638011134cc6b85888
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Mon Jun 25 10:52:31 2018 +0100

    MINOR: Fix timing issue in advertised listener update test (#5256)
    
    Wait for produce to fail before updating listener to avoid send succeeding 
after the listener update. Also use different topics in tests with connection 
failures where one is expected to fail and the other is expected to succeed.
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>
---
 .../server/DynamicBrokerReconfigurationTest.scala  | 28 +++++++++++++++-------
 1 file changed, 19 insertions(+), 9 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 96cca23..38b9f45 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -733,17 +733,25 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     // Verify that producer connections fail since advertised listener is 
invalid
     val bootstrap = TestUtils.bootstrapServers(servers, new 
ListenerName(SecureExternal))
       .replaceAll(invalidHost, "localhost") // allow bootstrap connection to 
succeed
-    val producer1 = 
ProducerBuilder().trustStoreProps(sslProperties1).maxRetries(0).bootstrapServers(bootstrap).build()
+    val producer1 = ProducerBuilder().trustStoreProps(sslProperties1)
+      .maxRetries(0)
+      .requestTimeoutMs(1000)
+      .bootstrapServers(bootstrap)
+      .build()
 
-    val sendFuture = verifyConnectionFailure(producer1)
+    assertTrue(intercept[ExecutionException] {
+      producer1.send(new ProducerRecord(topic, "key", "value")).get(2, 
TimeUnit.SECONDS)
+    }.getCause.isInstanceOf[org.apache.kafka.common.errors.TimeoutException])
 
     alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, 
"localhost")
     servers.foreach(validateEndpointsInZooKeeper(_, endpoints => 
!endpoints.contains(invalidHost)))
 
     // Verify that produce/consume work now
+    val topic2 = "testtopic2"
+    TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = 
numServers, servers)
     val producer = 
ProducerBuilder().trustStoreProps(sslProperties1).maxRetries(0).build()
-    val consumer = 
ConsumerBuilder("group2").trustStoreProps(sslProperties1).topic(topic).build()
-    verifyProduceConsume(producer, consumer, 10, topic)
+    val consumer = 
ConsumerBuilder("group2").trustStoreProps(sslProperties1).topic(topic2).build()
+    verifyProduceConsume(producer, consumer, 10, topic2)
 
     // Verify updating inter-broker listener
     val props = new Properties
@@ -756,9 +764,6 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
         assertTrue(s"Unexpected exception ${e.getCause}", 
e.getCause.isInstanceOf[InvalidRequestException])
         servers.foreach(server => assertEquals(SecureInternal, 
server.config.interBrokerListenerName.value))
     }
-
-    // Verify that the other send did not complete
-    verifyTimeout(sendFuture)
   }
 
   @Test
@@ -938,13 +943,15 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     val consumerFuture = verifyConnectionFailure(consumer1)
 
     // Test that other listeners still work
+    val topic2 = "testtopic2"
+    TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = 
numServers, servers)
     val producer2 = 
ProducerBuilder().trustStoreProps(sslProperties1).maxRetries(0).build()
     val consumer2 = 
ConsumerBuilder(s"remove-listener-group2-$securityProtocol")
       .trustStoreProps(sslProperties1)
-      .topic(topic)
+      .topic(topic2)
       .autoOffsetReset("latest")
       .build()
-    verifyProduceConsume(producer2, consumer2, numRecords = 10, topic)
+    verifyProduceConsume(producer2, consumer2, numRecords = 10, topic2)
 
     // Verify that producer/consumer using old listener don't work
     verifyTimeout(producerFuture)
@@ -1361,13 +1368,16 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
   private case class ProducerBuilder() extends 
ClientBuilder[KafkaProducer[String, String]] {
     private var _retries = 0
     private var _acks = -1
+    private var _requestTimeoutMs = 30000L
 
     def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this 
}
     def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
+    def requestTimeoutMs(timeoutMs: Long): ProducerBuilder = { 
_requestTimeoutMs = timeoutMs; this }
 
     override def build(): KafkaProducer[String, String] = {
       val producer = TestUtils.createProducer(bootstrapServers,
         acks = _acks,
+        requestTimeoutMs = _requestTimeoutMs,
         retries = _retries,
         securityProtocol = _securityProtocol,
         trustStoreFile = Some(trustStoreFile1),

Reply via email to