vvcephei commented on a change in pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#discussion_r448581607



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable 
runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, 
runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), 
oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == 
KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is 
encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: 
Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health 
scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));
+
+        if (wasClosed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health 
scripts
+        } else if (wasClosed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't 
close");

Review comment:
       Sounds good.
   
   ```suggestion
               System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't 
close in time.");
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to