This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit c853929c3a7c57f2292e676efe4286e34ddeea10
Author: Boyang Chen <boy...@confluent.io>
AuthorDate: Thu Feb 6 10:58:05 2020 -0800

    HOTFIX: Fix spotsbug failure in Kafka examples (#8051)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/examples/KafkaConsumerProducerDemo.java    |  8 +++++++-
 .../java/kafka/examples/KafkaExactlyOnceDemo.java    | 20 ++++++++++++++------
 2 files changed, 21 insertions(+), 7 deletions(-)

diff --git 
a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java 
b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index 561732b..21d85c3 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -16,6 +16,8 @@
  */
 package kafka.examples;
 
+import org.apache.kafka.common.errors.TimeoutException;
+
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -28,7 +30,11 @@ public class KafkaConsumerProducerDemo {
 
         Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, 
"DemoConsumer", false, 10000, latch);
         consumerThread.start();
-        latch.await(5, TimeUnit.MINUTES);
+
+        if (!latch.await(5, TimeUnit.MINUTES)) {
+            throw new TimeoutException("Timeout after 5 minutes waiting for 
demo producer and consumer to finish");
+        }
+
         consumerThread.shutdown();
         System.out.println("All finished!");
     }
diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java 
b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
index d418eba..288b786 100644
--- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
@@ -19,6 +19,7 @@ package kafka.examples;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 
@@ -77,9 +78,9 @@ public class KafkaExactlyOnceDemo {
         }
 
         String mode = args[0];
-        int numPartitions = Integer.valueOf(args[1]);
-        int numInstances = Integer.valueOf(args[2]);
-        int numRecords = Integer.valueOf(args[3]);
+        int numPartitions = Integer.parseInt(args[1]);
+        int numInstances = Integer.parseInt(args[2]);
+        int numRecords = Integer.parseInt(args[3]);
 
         /* Stage 1: topic cleanup and recreation */
         recreateTopics(numPartitions);
@@ -90,7 +91,9 @@ public class KafkaExactlyOnceDemo {
         Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, 
numRecords, prePopulateLatch);
         producerThread.start();
 
-        prePopulateLatch.await(5, TimeUnit.MINUTES);
+        if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
+            throw new TimeoutException("Timeout after 5 minutes waiting for 
data pre-population");
+        }
 
         CountDownLatch transactionalCopyLatch = new 
CountDownLatch(numInstances);
 
@@ -102,7 +105,9 @@ public class KafkaExactlyOnceDemo {
             messageProcessor.start();
         }
 
-        transactionalCopyLatch.await(5, TimeUnit.MINUTES);
+        if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
+            throw new TimeoutException("Timeout after 5 minutes waiting for 
transactionally message copy");
+        }
 
         CountDownLatch consumeLatch = new CountDownLatch(1);
 
@@ -110,7 +115,10 @@ public class KafkaExactlyOnceDemo {
         Consumer consumerThread = new Consumer(OUTPUT_TOPIC, 
"Verify-consumer", true, numRecords, consumeLatch);
         consumerThread.start();
 
-        consumeLatch.await(5, TimeUnit.MINUTES);
+        if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
+            throw new TimeoutException("Timeout after 5 minutes waiting for 
output data consumption");
+        }
+
         consumerThread.shutdown();
         System.out.println("All finished!");
     }

Reply via email to