chia7712 commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1689006610
##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -95,98 +88,60 @@
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
/**
- * Setup an embedded Kafka cluster with specified number of brokers and
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}.
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to
easily create Kafka topics, produce data,
+ * consume data etc.
*/
public class EmbeddedKafkaCluster {
private static final Logger log =
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
- private static final long DEFAULT_PRODUCE_SEND_DURATION_MS =
TimeUnit.SECONDS.toMillis(120);
+ private static final long DEFAULT_PRODUCE_SEND_DURATION_MS =
TimeUnit.SECONDS.toMillis(120);
- // Kafka Config
- private final KafkaServer[] brokers;
+ private final KafkaClusterTestKit cluster;
private final Properties brokerConfig;
- private final Time time = Time.SYSTEM;
- private final int[] currentBrokerPorts;
- private final String[] currentBrokerLogDirs;
- private final boolean hasListenerConfig;
+ private final Map<String, String> clientConfigs;
- final Map<String, String> clientConfigs;
-
- private EmbeddedZookeeper zookeeper = null;
- private ListenerName listenerName = new ListenerName("PLAINTEXT");
private KafkaProducer<byte[], byte[]> producer;
- public EmbeddedKafkaCluster(final int numBrokers,
- final Properties brokerConfig) {
+ public EmbeddedKafkaCluster(final int numBrokers, final Properties
brokerConfig) {
this(numBrokers, brokerConfig, Collections.emptyMap());
}
public EmbeddedKafkaCluster(final int numBrokers,
- final Properties brokerConfig,
- final Map<String, String> clientConfigs) {
- brokers = new KafkaServer[numBrokers];
- currentBrokerPorts = new int[numBrokers];
- currentBrokerLogDirs = new String[numBrokers];
- this.brokerConfig = brokerConfig;
- // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we
track whether
- // a listener config is defined during initialization in order to know
if it's
- // safe to override it
- hasListenerConfig =
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+ final Properties brokerConfig,
+ final Map<String, String> clientConfigs) {
+ addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+ try {
+ KafkaClusterTestKit.Builder clusterBuilder = new
KafkaClusterTestKit.Builder(
Review Comment:
@C0urante Thanks for your response. I move the discussion to
https://issues.apache.org/jira/browse/KAFKA-17174 to avoid unnecessary noise in
this PR :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]