mimaison commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1692881997


##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -199,149 +153,74 @@ private void doStart() {
         producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    public void stopOnlyKafka() {
-        stop(false, false);
-    }
-
-    public void stop() {
-        stop(true, true);
-    }
-
-    private void stop(boolean deleteLogDirs, boolean stopZK) {
-        maybeShutDownProducer();
-        triggerBrokerShutdown();
-        awaitBrokerShutdown();
-
-        if (deleteLogDirs)
-            deleteLogDirs();
-
-        if (stopZK)
-            stopZK();
-    }
-
-    private void maybeShutDownProducer() {
-        try {
-            if (producer != null) {
-                producer.close();
-            }
-        } catch (Exception e) {
-            log.error("Could not shutdown producer ", e);
-            throw new RuntimeException("Could not shutdown producer", e);
-        }
-    }
-
-    private void triggerBrokerShutdown() {
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.shutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void awaitBrokerShutdown() {
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.awaitShutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Failed while awaiting shutdown of 
broker at %s", address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void deleteLogDirs() {
-        for (KafkaServer broker : brokers) {
-            try {
-                log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-                CoreUtils.delete(broker.config().logDirs());
-            } catch (Throwable t) {
-                String msg = String.format("Could not clean up log dirs for 
broker at %s",
-                        address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void stopZK() {
-        try {
-            zookeeper.shutdown();
-        } catch (Throwable t) {
-            String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-            log.error(msg, t);
-            throw new RuntimeException(msg, t);
-        }
+    /**
+     * Restarts the Kafka brokers. This can be called after {@link 
#stopTemporarily()}. Note that if the Kafka brokers
+     * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+     * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use 
a fixed non-zero free port. Also note that this is
+     * only possible when {@code numBrokers} is 1.
+     */
+    public void restart() {
+        cluster.brokers().values().forEach(BrokerServer::startup);
     }
 
-    private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            props.put(propertyKey, propertyValue);
-        }
+    /**
+     * Stop the brokers (and controllers) in the Kafka cluster. This can be 
used to test Connect's functionality

Review Comment:
   That will teach me to trust method names! Thanks for the explanation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to