Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]

2025-05-30 Thread via GitHub


chia7712 merged PR #19651:
URL: https://github.com/apache/kafka/pull/19651


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]

2025-05-29 Thread via GitHub


chia7712 commented on code in PR #19651:
URL: https://github.com/apache/kafka/pull/19651#discussion_r2114139043


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java:
##
@@ -226,4 +249,205 @@ var record = new ProducerRecord<>(
 producer.send(record);
 return record;
 }
+
+public static void sendAndAwaitAsyncCommit(
+Consumer consumer,
+Optional> offsetsOpt
+) throws InterruptedException {
+
+var commitCallback = new RetryCommitCallback(consumer, offsetsOpt);
+sendAsyncCommit(consumer, commitCallback, offsetsOpt);
+
+TestUtils.waitForCondition(() -> {
+consumer.poll(Duration.ofMillis(100));
+return commitCallback.isComplete;
+}, "Failed to observe commit callback before timeout");
+
+assertEquals(Optional.empty(), commitCallback.error);
+}
+
+public static void awaitRebalance(
+Consumer consumer,
+TestConsumerReassignmentListener rebalanceListener
+) throws InterruptedException {
+var numReassignments = rebalanceListener.callsToAssigned;
+TestUtils.waitForCondition(() -> {
+consumer.poll(Duration.ofMillis(100));
+return rebalanceListener.callsToAssigned > numReassignments;
+}, "Timed out before expected rebalance completed");
+}
+
+public static void ensureNoRebalance(
+Consumer consumer,
+TestConsumerReassignmentListener rebalanceListener
+) throws InterruptedException {
+// The best way to verify that the current membership is still active 
is to commit offsets.
+// This would fail if the group had rebalanced.
+var initialRevokeCalls = rebalanceListener.callsToRevoked;
+sendAndAwaitAsyncCommit(consumer, Optional.empty());
+assertEquals(initialRevokeCalls, rebalanceListener.callsToRevoked);
+}
+
+/**
+ * This class is intended to replace the test cases in 
BaseConsumerTest.scala.
+ * When converting tests that extend from BaseConsumerTest.scala to Java,
+ * we should use the test cases provided in this class.
+ */
+public static final class BaseConsumerTestcase {
+
+public static final int BROKER_COUNT = 3;
+public static final String TOPIC = "topic";
+public static final TopicPartition TP = new TopicPartition(TOPIC, 0);
+
+private BaseConsumerTestcase() {
+}
+
+public static void testSimpleConsumption(
+ClusterInstance cluster,
+Map config
+) throws InterruptedException {
+var numRecords = 1;
+var startingTimestamp = System.currentTimeMillis();
+sendRecords(cluster, TP, numRecords, startingTimestamp);
+try (Consumer consumer = cluster.consumer(config)) 
{
+assertEquals(0, consumer.assignment().size());
+consumer.assign(List.of(TP));
+assertEquals(1, consumer.assignment().size());
+consumer.seek(TP, 0);
+consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+// check async commit callbacks
+sendAndAwaitAsyncCommit(consumer, Optional.empty());
+}
+}
+
+public static void testClusterResourceListener(
+ClusterInstance cluster,
+Map consumerConfig
+) throws InterruptedException {
+var numRecords = 100;
+Map producerConfig = Map.of(
+KEY_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class,
+VALUE_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class
+);
+Map consumerConfigOverrides = new 
HashMap<>(consumerConfig);
+consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);
+consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);
+try (Producer producer = 
cluster.producer(producerConfig);
+ Consumer consumer = 
cluster.consumer(consumerConfigOverrides)
+) {
+var startingTimestamp = System.currentTimeMillis();
+sendRecords(producer, TP, numRecords, startingTimestamp, -1);
+
+consumer.subscribe(List.of(TP.topic()));
+consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+assertNotEquals(0, UPDATE_PRODUCER_COUNT.get());

Review Comment:
   Should we reset those counts to avoid getting corrupted by other tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please conta

Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]

2025-05-16 Thread via GitHub


chia7712 commented on code in PR #19651:
URL: https://github.com/apache/kafka/pull/19651#discussion_r2093375272


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java:
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class BaseConsumerTest {

Review Comment:
   could you please rename it to `ConsumerTestUtils`? Additionally, please add 
private constructor and final keyword to ensure it is not able to be extended



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]

2025-05-13 Thread via GitHub


m1a2st commented on code in PR #19651:
URL: https://github.com/apache/kafka/pull/19651#discussion_r2086737025


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##
@@ -2994,35 +2992,4 @@ private void waitForAssignment(String groupId, 
List tps) {
 throw new RuntimeException(e);
 }
 }
-
-public static class SerializerImpl implements Serializer {

Review Comment:
   I noticed that `BaseConsumerTest.scala` contains the same class, but it's 
only used in `PlaintextConsumerTest.scala`. So, when we rewrite 
`PlaintextConsumerTest.scala` in Java, we should move this class to a utility 
class. I will revert now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]

2025-05-12 Thread via GitHub


chia7712 commented on code in PR #19651:
URL: https://github.com/apache/kafka/pull/19651#discussion_r2086042662


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##
@@ -2994,35 +2992,4 @@ private void waitForAssignment(String groupId, 
List tps) {
 throw new RuntimeException(e);
 }
 }
-
-public static class SerializerImpl implements Serializer {

Review Comment:
   why moving it to `ClientsTestUtils`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]

2025-05-12 Thread via GitHub


chia7712 commented on code in PR #19651:
URL: https://github.com/apache/kafka/pull/19651#discussion_r2084667052


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Locale;
+import java.util.Map;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.BaseConsumerTest.Testcase.testClusterResourceListener;
+import static 
org.apache.kafka.clients.consumer.BaseConsumerTest.Testcase.testCoordinatorFailover;
+import static 
org.apache.kafka.clients.consumer.BaseConsumerTest.Testcase.testSimpleConsumption;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.SaslPlainPlaintextConsumerTest.MECHANISMS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
+import static 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG;
+import static 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+
+@ClusterTestDefaults(
+types = {Type.KRAFT},
+brokers = BaseConsumerTest.BROKER_COUNT,
+serverProperties = {
+@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+@ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+@ClusterConfigProperty(key = SASL_ENABLED_MECHANISMS_CONFIG, value = 
MECHANISMS),
+@ClusterConfigProperty(key = 
SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, value = MECHANISMS),
+}
+)
+public class SaslPlainPlaintextConsumerTest {
+
+private final ClusterInstance cluster;
+private static final String USERNAME = "plain-admin";

Review Comment:
   ditto



##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kaf

Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]

2025-05-08 Thread via GitHub


frankvicky commented on code in PR #19651:
URL: https://github.com/apache/kafka/pull/19651#discussion_r2080971393


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java:
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class BaseConsumerTest {
+
+public static final AtomicInteger UPDATE_PRODUCER_COUNT = new 
AtomicInteger();
+public static final AtomicInteger UPDATE_CONSUMER_COUNT = new 
AtomicInteger();
+public static final int BROKER_COUNT = 3;
+
+public static class Testcase {
+
+public static final String TOPIC = "topic";
+public static final TopicPartition TP = new TopicPartition(TOPIC, 0);
+
+public static void testSimpleConsumption(
+ClusterInstance cluster, 
+Map config
+) throws InterruptedException {
+var numRecords = 1;
+var startingTimestamp = System.currentTimeMillis();
+sendRecords(cluster, TP, numRecords, startingTimestamp);
+try (Consumer consumer = cluster.consumer(config)) 
{
+assertEquals(0, consumer.assignment().size());
+consumer.assign(List.of(TP));
+assertEquals(1, consumer.assignment().size());
+
+consumer.seek(TP, 0);
+consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+// check async commit callbacks
+sendAndAwaitAsyncCommit(consumer, Optional.empty());
+}
+}
+
+public static void testClusterResourceListener(
+ClusterInstance cluster,
+Map consumerConfig
+) throws InterruptedException {
+var numRecords = 100;
+Map producerConfig = Map.of(
+KEY_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class,
+VALUE_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class
+);
+Map consumerConfigOverrides = new 
HashMap<>(consumerConfig);
+consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);
+consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]

2025-05-08 Thread via GitHub


m1a2st commented on code in PR #19651:
URL: https://github.com/apache/kafka/pull/19651#discussion_r2080919129


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java:
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class BaseConsumerTest {
+
+public static final AtomicInteger UPDATE_PRODUCER_COUNT = new 
AtomicInteger();
+public static final AtomicInteger UPDATE_CONSUMER_COUNT = new 
AtomicInteger();
+public static final int BROKER_COUNT = 3;
+
+public static class Testcase {
+
+public static final String TOPIC = "topic";
+public static final TopicPartition TP = new TopicPartition(TOPIC, 0);
+
+public static void testSimpleConsumption(
+ClusterInstance cluster, 
+Map config
+) throws InterruptedException {
+var numRecords = 1;
+var startingTimestamp = System.currentTimeMillis();
+sendRecords(cluster, TP, numRecords, startingTimestamp);
+try (Consumer consumer = cluster.consumer(config)) 
{
+assertEquals(0, consumer.assignment().size());
+consumer.assign(List.of(TP));
+assertEquals(1, consumer.assignment().size());
+
+consumer.seek(TP, 0);
+consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+// check async commit callbacks
+sendAndAwaitAsyncCommit(consumer, Optional.empty());
+}
+}
+
+public static void testClusterResourceListener(
+ClusterInstance cluster,
+Map consumerConfig
+) throws InterruptedException {
+var numRecords = 100;
+Map producerConfig = Map.of(
+KEY_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class,
+VALUE_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class
+);
+Map consumerConfigOverrides = new 
HashMap<>(consumerConfig);
+consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);
+consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);

Review Comment:
   Without extra security settings on the consumer, the test will pass without 
SASL/PLAIN plaintext.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries abo

Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]

2025-05-08 Thread via GitHub


frankvicky commented on code in PR #19651:
URL: https://github.com/apache/kafka/pull/19651#discussion_r2080748529


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java:
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class BaseConsumerTest {
+
+public static final AtomicInteger UPDATE_PRODUCER_COUNT = new 
AtomicInteger();
+public static final AtomicInteger UPDATE_CONSUMER_COUNT = new 
AtomicInteger();
+public static final int BROKER_COUNT = 3;
+
+public static class Testcase {
+
+public static final String TOPIC = "topic";
+public static final TopicPartition TP = new TopicPartition(TOPIC, 0);
+
+public static void testSimpleConsumption(
+ClusterInstance cluster, 
+Map config
+) throws InterruptedException {
+var numRecords = 1;
+var startingTimestamp = System.currentTimeMillis();
+sendRecords(cluster, TP, numRecords, startingTimestamp);
+try (Consumer consumer = cluster.consumer(config)) 
{
+assertEquals(0, consumer.assignment().size());
+consumer.assign(List.of(TP));
+assertEquals(1, consumer.assignment().size());
+
+consumer.seek(TP, 0);
+consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+// check async commit callbacks
+sendAndAwaitAsyncCommit(consumer, Optional.empty());
+}
+}
+
+public static void testClusterResourceListener(
+ClusterInstance cluster,
+Map consumerConfig
+) throws InterruptedException {
+var numRecords = 100;
+Map producerConfig = Map.of(
+KEY_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class,
+VALUE_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class
+);
+Map consumerConfigOverrides = new 
HashMap<>(consumerConfig);
+consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);
+consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);

Review Comment:
   I get your point.
   But it seems that we don't need consumerConfig, which could also pass the 
tests.
   Could you verify it ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@

Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]

2025-05-08 Thread via GitHub


frankvicky commented on code in PR #19651:
URL: https://github.com/apache/kafka/pull/19651#discussion_r2080199141


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java:
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class BaseConsumerTest {
+
+public static final AtomicInteger UPDATE_PRODUCER_COUNT = new 
AtomicInteger();
+public static final AtomicInteger UPDATE_CONSUMER_COUNT = new 
AtomicInteger();
+public static final int BROKER_COUNT = 3;
+
+public static class Testcase {
+
+public static final String TOPIC = "topic";
+public static final TopicPartition TP = new TopicPartition(TOPIC, 0);
+
+public static void testSimpleConsumption(
+ClusterInstance cluster, 
+Map config
+) throws InterruptedException {
+var numRecords = 1;
+var startingTimestamp = System.currentTimeMillis();
+sendRecords(cluster, TP, numRecords, startingTimestamp);
+try (Consumer consumer = cluster.consumer(config)) 
{
+assertEquals(0, consumer.assignment().size());
+consumer.assign(List.of(TP));
+assertEquals(1, consumer.assignment().size());
+
+consumer.seek(TP, 0);
+consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+// check async commit callbacks
+sendAndAwaitAsyncCommit(consumer, Optional.empty());
+}
+}
+
+public static void testClusterResourceListener(
+ClusterInstance cluster,
+Map consumerConfig
+) throws InterruptedException {
+var numRecords = 100;
+Map producerConfig = Map.of(
+KEY_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class,
+VALUE_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class
+);
+Map consumerConfigOverrides = new 
HashMap<>(consumerConfig);
+consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);
+consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);

Review Comment:
   Don't get it.
   I try `Map.of` on my local machine and it works just fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, pl

Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]

2025-05-08 Thread via GitHub


FrankYang0529 commented on code in PR #19651:
URL: https://github.com/apache/kafka/pull/19651#discussion_r2079722269


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Locale;
+import java.util.Map;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.BaseConsumerTest.Testcase.testClusterResourceListener;
+import static 
org.apache.kafka.clients.consumer.BaseConsumerTest.Testcase.testCoordinatorFailover;
+import static 
org.apache.kafka.clients.consumer.BaseConsumerTest.Testcase.testSimpleConsumption;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.SaslPlainPlaintextConsumerTest.MECHANISMS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
+import static 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG;
+import static 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+
+@ClusterTestDefaults(
+types = {Type.KRAFT},
+brokers = BaseConsumerTest.BROKER_COUNT,
+serverProperties = {
+@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),

Review Comment:
   It looks like default value is 3. Do we need to specify it here?
   
   
https://github.com/apache/kafka/blob/98e535b5240beccba069ae10dc6692d1b842e9ac/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java#L98-L99



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]

2025-05-08 Thread via GitHub


m1a2st commented on code in PR #19651:
URL: https://github.com/apache/kafka/pull/19651#discussion_r2079548501


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java:
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class BaseConsumerTest {
+
+public static final AtomicInteger UPDATE_PRODUCER_COUNT = new 
AtomicInteger();
+public static final AtomicInteger UPDATE_CONSUMER_COUNT = new 
AtomicInteger();
+public static final int BROKER_COUNT = 3;
+
+public static class Testcase {
+
+public static final String TOPIC = "topic";
+public static final TopicPartition TP = new TopicPartition(TOPIC, 0);
+
+public static void testSimpleConsumption(
+ClusterInstance cluster, 
+Map config
+) throws InterruptedException {
+var numRecords = 1;
+var startingTimestamp = System.currentTimeMillis();
+sendRecords(cluster, TP, numRecords, startingTimestamp);
+try (Consumer consumer = cluster.consumer(config)) 
{
+assertEquals(0, consumer.assignment().size());
+consumer.assign(List.of(TP));
+assertEquals(1, consumer.assignment().size());
+
+consumer.seek(TP, 0);
+consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+// check async commit callbacks
+sendAndAwaitAsyncCommit(consumer, Optional.empty());
+}
+}
+
+public static void testClusterResourceListener(
+ClusterInstance cluster,
+Map consumerConfig
+) throws InterruptedException {
+var numRecords = 100;
+Map producerConfig = Map.of(
+KEY_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class,
+VALUE_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class
+);
+Map consumerConfigOverrides = new 
HashMap<>(consumerConfig);
+consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);
+consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);

Review Comment:
   Need to override `consumerConfig`, thus can not use `Map.of`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infras

Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]

2025-05-08 Thread via GitHub


frankvicky commented on code in PR #19651:
URL: https://github.com/apache/kafka/pull/19651#discussion_r2079140399


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java:
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class BaseConsumerTest {
+
+public static final AtomicInteger UPDATE_PRODUCER_COUNT = new 
AtomicInteger();
+public static final AtomicInteger UPDATE_CONSUMER_COUNT = new 
AtomicInteger();
+public static final int BROKER_COUNT = 3;
+
+public static class Testcase {
+
+public static final String TOPIC = "topic";
+public static final TopicPartition TP = new TopicPartition(TOPIC, 0);
+
+public static void testSimpleConsumption(
+ClusterInstance cluster, 
+Map config
+) throws InterruptedException {
+var numRecords = 1;
+var startingTimestamp = System.currentTimeMillis();
+sendRecords(cluster, TP, numRecords, startingTimestamp);
+try (Consumer consumer = cluster.consumer(config)) 
{
+assertEquals(0, consumer.assignment().size());
+consumer.assign(List.of(TP));
+assertEquals(1, consumer.assignment().size());
+
+consumer.seek(TP, 0);
+consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+// check async commit callbacks
+sendAndAwaitAsyncCommit(consumer, Optional.empty());
+}
+}
+
+public static void testClusterResourceListener(
+ClusterInstance cluster,
+Map consumerConfig
+) throws InterruptedException {
+var numRecords = 100;
+Map producerConfig = Map.of(
+KEY_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class,
+VALUE_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class
+);
+Map consumerConfigOverrides = new 
HashMap<>(consumerConfig);
+consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);
+consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);

Review Comment:
   `Map.of`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org