C0urante commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1690265815


##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -199,149 +153,74 @@ private void doStart() {
         producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    public void stopOnlyKafka() {
-        stop(false, false);
-    }
-
-    public void stop() {
-        stop(true, true);
-    }
-
-    private void stop(boolean deleteLogDirs, boolean stopZK) {
-        maybeShutDownProducer();
-        triggerBrokerShutdown();
-        awaitBrokerShutdown();
-
-        if (deleteLogDirs)
-            deleteLogDirs();
-
-        if (stopZK)
-            stopZK();
-    }
-
-    private void maybeShutDownProducer() {
-        try {
-            if (producer != null) {
-                producer.close();
-            }
-        } catch (Exception e) {
-            log.error("Could not shutdown producer ", e);
-            throw new RuntimeException("Could not shutdown producer", e);
-        }
-    }
-
-    private void triggerBrokerShutdown() {
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.shutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void awaitBrokerShutdown() {
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.awaitShutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Failed while awaiting shutdown of 
broker at %s", address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void deleteLogDirs() {
-        for (KafkaServer broker : brokers) {
-            try {
-                log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-                CoreUtils.delete(broker.config().logDirs());
-            } catch (Throwable t) {
-                String msg = String.format("Could not clean up log dirs for 
broker at %s",
-                        address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void stopZK() {
-        try {
-            zookeeper.shutdown();
-        } catch (Throwable t) {
-            String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-            log.error(msg, t);
-            throw new RuntimeException(msg, t);
-        }
+    /**
+     * Restarts the Kafka brokers. This can be called after {@link 
#stopTemporarily()}. Note that if the Kafka brokers
+     * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+     * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use 
a fixed non-zero free port. Also note that this is
+     * only possible when {@code numBrokers} is 1.
+     */
+    public void restart() {
+        cluster.brokers().values().forEach(BrokerServer::startup);
     }
 
-    private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            props.put(propertyKey, propertyValue);
-        }
+    /**
+     * Stop the brokers (and controllers) in the Kafka cluster. This can be 
used to test Connect's functionality
+     * when the backing Kafka cluster goes offline.
+     */
+    public void stopTemporarily() {
+        cluster.brokers().values().forEach(BrokerServer::shutdown);
+        cluster.brokers().values().forEach(BrokerServer::awaitShutdown);
     }
 
-    private String createLogDir() {
-        try {
-            return 
Files.createTempDirectory(getClass().getSimpleName()).toString();
-        } catch (IOException e) {
-            log.error("Unable to create temporary log directory", e);
-            throw new ConnectException("Unable to create temporary log 
directory", e);
+    public void stop() {
+        AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
+        Utils.closeQuietly(producer, "producer for embedded Kafka cluster", 
shutdownFailure);
+        Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure);
+        if (shutdownFailure.get() != null) {
+            throw new ConnectException("Failed to shut down producer / 
embedded Kafka cluster", shutdownFailure.get());
         }
     }
 
     public String bootstrapServers() {
-        return Arrays.stream(brokers)
-                .map(this::address)
-                .collect(Collectors.joining(","));
-    }
-
-    public String address(KafkaServer server) {
-        final EndPoint endPoint = server.advertisedListeners().head();
-        return endPoint.host() + ":" + endPoint.port();
-    }
-
-    public String zKConnectString() {
-        return "127.0.0.1:" + zookeeper.port();
+        return cluster.bootstrapServers();
     }
 
     /**
      * Get the brokers that have a {@link BrokerState#RUNNING} state.
      *
-     * @return the list of {@link KafkaServer} instances that are running;
-     *         never null but  possibly empty
+     * @return the set of {@link BrokerServer} instances that are running;
+     *         never null but possibly empty
      */
-    public Set<KafkaServer> runningBrokers() {
+    public Set<BrokerServer> runningBrokers() {
         return brokersInState(state -> state == BrokerState.RUNNING);
     }
 
     /**
      * Get the brokers whose state match the given predicate.
      *
-     * @return the list of {@link KafkaServer} instances with states that 
match the predicate;
-     *         never null but  possibly empty
+     * @return the set of {@link BrokerServer} instances with states that 
match the predicate;
+     *         never null but possibly empty
      */
-    public Set<KafkaServer> brokersInState(Predicate<BrokerState> 
desiredState) {
-        return Arrays.stream(brokers)
-                     .filter(b -> hasState(b, desiredState))
-                     .collect(Collectors.toSet());
+    public Set<BrokerServer> brokersInState(Predicate<BrokerState> 
desiredState) {
+        return cluster.brokers().values().stream()
+                .filter(b -> hasState(b, desiredState))
+                .collect(Collectors.toSet());
     }
 
-    protected boolean hasState(KafkaServer server, Predicate<BrokerState> 
desiredState) {
+    protected boolean hasState(BrokerServer server, Predicate<BrokerState> 
desiredState) {
         try {
             return desiredState.test(server.brokerState());

Review Comment:
   Sure, removed the try-catch 👍



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -447,9 +326,9 @@ public void createTopic(String topic, int partitions, int 
replication, Map<Strin
      * @param adminClientConfig Additional admin client configuration settings.
      */
     public void createTopic(String topic, int partitions, int replication, 
Map<String, String> topicConfig, Map<String, Object> adminClientConfig) {
-        if (replication > brokers.length) {
+        if (replication > cluster.brokers().size()) {

Review Comment:
   This logic is basically unchanged from the current state of things; the only 
reason it shows up in the diff is because there's a slightly different internal 
API for counting the number of brokers. I'm not sure why we had it originally, 
but unless it would leave to a regression I'd prefer not to block this PR on 
figuring that out and save it for a follow-up. Is that alright?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -1369,13 +1366,46 @@ private void createTopics() {
     /*
      * Generate some consumer activity on both clusters to ensure the 
checkpoint connector always starts promptly
      */
-    protected void warmUpConsumer(Map<String, Object> consumerProps) {
-        try (Consumer<byte[], byte[]> dummyConsumer = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
-            dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-            dummyConsumer.commitSync();
-        }
-        try (Consumer<byte[], byte[]> dummyConsumer = 
backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+    protected final void warmUpConsumer(Map<String, Object> consumerProps) {
+        final String topic = "test-topic-1";
+        warmUpConsumer("primary", primary.kafka(), consumerProps, topic, 
NUM_PARTITIONS);
+        warmUpConsumer("backup", backup.kafka(), consumerProps, topic, 
NUM_PARTITIONS);
+    }
+
+    private void warmUpConsumer(String clusterName, EmbeddedKafkaCluster 
kafkaCluster, Map<String, Object> consumerProps, String topic, int 
numPartitions) {
+        try (Consumer<?, ?> dummyConsumer = 
kafkaCluster.createConsumerAndSubscribeTo(consumerProps, topic)) {
+            // poll to ensure we've joined the group
             dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+
+            // force the consumer to have a known position on every topic 
partition
+            // so that it will be able to commit offsets for that position
+            // (it's possible that poll returns before that has happened)
+            Set<TopicPartition> topicPartitionsPendingPosition = 
IntStream.range(0, NUM_PARTITIONS)

Review Comment:
   Thanks, good catch! Removed `numPartitions`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to