Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]
chia7712 merged PR #19651: URL: https://github.com/apache/kafka/pull/19651 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]
chia7712 commented on code in PR #19651: URL: https://github.com/apache/kafka/pull/19651#discussion_r2114139043 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java: ## @@ -226,4 +249,205 @@ var record = new ProducerRecord<>( producer.send(record); return record; } + +public static void sendAndAwaitAsyncCommit( +Consumer consumer, +Optional> offsetsOpt +) throws InterruptedException { + +var commitCallback = new RetryCommitCallback(consumer, offsetsOpt); +sendAsyncCommit(consumer, commitCallback, offsetsOpt); + +TestUtils.waitForCondition(() -> { +consumer.poll(Duration.ofMillis(100)); +return commitCallback.isComplete; +}, "Failed to observe commit callback before timeout"); + +assertEquals(Optional.empty(), commitCallback.error); +} + +public static void awaitRebalance( +Consumer consumer, +TestConsumerReassignmentListener rebalanceListener +) throws InterruptedException { +var numReassignments = rebalanceListener.callsToAssigned; +TestUtils.waitForCondition(() -> { +consumer.poll(Duration.ofMillis(100)); +return rebalanceListener.callsToAssigned > numReassignments; +}, "Timed out before expected rebalance completed"); +} + +public static void ensureNoRebalance( +Consumer consumer, +TestConsumerReassignmentListener rebalanceListener +) throws InterruptedException { +// The best way to verify that the current membership is still active is to commit offsets. +// This would fail if the group had rebalanced. +var initialRevokeCalls = rebalanceListener.callsToRevoked; +sendAndAwaitAsyncCommit(consumer, Optional.empty()); +assertEquals(initialRevokeCalls, rebalanceListener.callsToRevoked); +} + +/** + * This class is intended to replace the test cases in BaseConsumerTest.scala. + * When converting tests that extend from BaseConsumerTest.scala to Java, + * we should use the test cases provided in this class. + */ +public static final class BaseConsumerTestcase { + +public static final int BROKER_COUNT = 3; +public static final String TOPIC = "topic"; +public static final TopicPartition TP = new TopicPartition(TOPIC, 0); + +private BaseConsumerTestcase() { +} + +public static void testSimpleConsumption( +ClusterInstance cluster, +Map config +) throws InterruptedException { +var numRecords = 1; +var startingTimestamp = System.currentTimeMillis(); +sendRecords(cluster, TP, numRecords, startingTimestamp); +try (Consumer consumer = cluster.consumer(config)) { +assertEquals(0, consumer.assignment().size()); +consumer.assign(List.of(TP)); +assertEquals(1, consumer.assignment().size()); +consumer.seek(TP, 0); +consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); +// check async commit callbacks +sendAndAwaitAsyncCommit(consumer, Optional.empty()); +} +} + +public static void testClusterResourceListener( +ClusterInstance cluster, +Map consumerConfig +) throws InterruptedException { +var numRecords = 100; +Map producerConfig = Map.of( +KEY_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class, +VALUE_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class +); +Map consumerConfigOverrides = new HashMap<>(consumerConfig); +consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); +consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); +try (Producer producer = cluster.producer(producerConfig); + Consumer consumer = cluster.consumer(consumerConfigOverrides) +) { +var startingTimestamp = System.currentTimeMillis(); +sendRecords(producer, TP, numRecords, startingTimestamp, -1); + +consumer.subscribe(List.of(TP.topic())); +consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); +assertNotEquals(0, UPDATE_PRODUCER_COUNT.get()); Review Comment: Should we reset those counts to avoid getting corrupted by other tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please conta
Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]
chia7712 commented on code in PR #19651: URL: https://github.com/apache/kafka/pull/19651#discussion_r2093375272 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java: ## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; +import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class BaseConsumerTest { Review Comment: could you please rename it to `ConsumerTestUtils`? Additionally, please add private constructor and final keyword to ensure it is not able to be extended -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]
m1a2st commented on code in PR #19651: URL: https://github.com/apache/kafka/pull/19651#discussion_r2086737025 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java: ## @@ -2994,35 +2992,4 @@ private void waitForAssignment(String groupId, List tps) { throw new RuntimeException(e); } } - -public static class SerializerImpl implements Serializer { Review Comment: I noticed that `BaseConsumerTest.scala` contains the same class, but it's only used in `PlaintextConsumerTest.scala`. So, when we rewrite `PlaintextConsumerTest.scala` in Java, we should move this class to a utility class. I will revert now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]
chia7712 commented on code in PR #19651: URL: https://github.com/apache/kafka/pull/19651#discussion_r2086042662 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java: ## @@ -2994,35 +2992,4 @@ private void waitForAssignment(String groupId, List tps) { throw new RuntimeException(e); } } - -public static class SerializerImpl implements Serializer { Review Comment: why moving it to `ClientsTestUtils`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]
chia7712 commented on code in PR #19651: URL: https://github.com/apache/kafka/pull/19651#discussion_r2084667052 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; + +import org.junit.jupiter.api.BeforeEach; + +import java.util.Locale; +import java.util.Map; + +import static org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.BaseConsumerTest.Testcase.testClusterResourceListener; +import static org.apache.kafka.clients.consumer.BaseConsumerTest.Testcase.testCoordinatorFailover; +import static org.apache.kafka.clients.consumer.BaseConsumerTest.Testcase.testSimpleConsumption; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.consumer.SaslPlainPlaintextConsumerTest.MECHANISMS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; +import static org.apache.kafka.common.config.internals.BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG; +import static org.apache.kafka.common.config.internals.BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; + +@ClusterTestDefaults( +types = {Type.KRAFT}, +brokers = BaseConsumerTest.BROKER_COUNT, +serverProperties = { +@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), +@ClusterConfigProperty(key = SASL_ENABLED_MECHANISMS_CONFIG, value = MECHANISMS), +@ClusterConfigProperty(key = SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, value = MECHANISMS), +} +) +public class SaslPlainPlaintextConsumerTest { + +private final ClusterInstance cluster; +private static final String USERNAME = "plain-admin"; Review Comment: ditto ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kaf
Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]
frankvicky commented on code in PR #19651: URL: https://github.com/apache/kafka/pull/19651#discussion_r2080971393 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java: ## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; +import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class BaseConsumerTest { + +public static final AtomicInteger UPDATE_PRODUCER_COUNT = new AtomicInteger(); +public static final AtomicInteger UPDATE_CONSUMER_COUNT = new AtomicInteger(); +public static final int BROKER_COUNT = 3; + +public static class Testcase { + +public static final String TOPIC = "topic"; +public static final TopicPartition TP = new TopicPartition(TOPIC, 0); + +public static void testSimpleConsumption( +ClusterInstance cluster, +Map config +) throws InterruptedException { +var numRecords = 1; +var startingTimestamp = System.currentTimeMillis(); +sendRecords(cluster, TP, numRecords, startingTimestamp); +try (Consumer consumer = cluster.consumer(config)) { +assertEquals(0, consumer.assignment().size()); +consumer.assign(List.of(TP)); +assertEquals(1, consumer.assignment().size()); + +consumer.seek(TP, 0); +consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); +// check async commit callbacks +sendAndAwaitAsyncCommit(consumer, Optional.empty()); +} +} + +public static void testClusterResourceListener( +ClusterInstance cluster, +Map consumerConfig +) throws InterruptedException { +var numRecords = 100; +Map producerConfig = Map.of( +KEY_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class, +VALUE_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class +); +Map consumerConfigOverrides = new HashMap<>(consumerConfig); +consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); +consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); Review Comment: Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]
m1a2st commented on code in PR #19651: URL: https://github.com/apache/kafka/pull/19651#discussion_r2080919129 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java: ## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; +import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class BaseConsumerTest { + +public static final AtomicInteger UPDATE_PRODUCER_COUNT = new AtomicInteger(); +public static final AtomicInteger UPDATE_CONSUMER_COUNT = new AtomicInteger(); +public static final int BROKER_COUNT = 3; + +public static class Testcase { + +public static final String TOPIC = "topic"; +public static final TopicPartition TP = new TopicPartition(TOPIC, 0); + +public static void testSimpleConsumption( +ClusterInstance cluster, +Map config +) throws InterruptedException { +var numRecords = 1; +var startingTimestamp = System.currentTimeMillis(); +sendRecords(cluster, TP, numRecords, startingTimestamp); +try (Consumer consumer = cluster.consumer(config)) { +assertEquals(0, consumer.assignment().size()); +consumer.assign(List.of(TP)); +assertEquals(1, consumer.assignment().size()); + +consumer.seek(TP, 0); +consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); +// check async commit callbacks +sendAndAwaitAsyncCommit(consumer, Optional.empty()); +} +} + +public static void testClusterResourceListener( +ClusterInstance cluster, +Map consumerConfig +) throws InterruptedException { +var numRecords = 100; +Map producerConfig = Map.of( +KEY_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class, +VALUE_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class +); +Map consumerConfigOverrides = new HashMap<>(consumerConfig); +consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); +consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); Review Comment: Without extra security settings on the consumer, the test will pass without SASL/PLAIN plaintext. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries abo
Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]
frankvicky commented on code in PR #19651: URL: https://github.com/apache/kafka/pull/19651#discussion_r2080748529 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java: ## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; +import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class BaseConsumerTest { + +public static final AtomicInteger UPDATE_PRODUCER_COUNT = new AtomicInteger(); +public static final AtomicInteger UPDATE_CONSUMER_COUNT = new AtomicInteger(); +public static final int BROKER_COUNT = 3; + +public static class Testcase { + +public static final String TOPIC = "topic"; +public static final TopicPartition TP = new TopicPartition(TOPIC, 0); + +public static void testSimpleConsumption( +ClusterInstance cluster, +Map config +) throws InterruptedException { +var numRecords = 1; +var startingTimestamp = System.currentTimeMillis(); +sendRecords(cluster, TP, numRecords, startingTimestamp); +try (Consumer consumer = cluster.consumer(config)) { +assertEquals(0, consumer.assignment().size()); +consumer.assign(List.of(TP)); +assertEquals(1, consumer.assignment().size()); + +consumer.seek(TP, 0); +consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); +// check async commit callbacks +sendAndAwaitAsyncCommit(consumer, Optional.empty()); +} +} + +public static void testClusterResourceListener( +ClusterInstance cluster, +Map consumerConfig +) throws InterruptedException { +var numRecords = 100; +Map producerConfig = Map.of( +KEY_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class, +VALUE_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class +); +Map consumerConfigOverrides = new HashMap<>(consumerConfig); +consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); +consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); Review Comment: I get your point. But it seems that we don't need consumerConfig, which could also pass the tests. Could you verify it ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@
Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]
frankvicky commented on code in PR #19651: URL: https://github.com/apache/kafka/pull/19651#discussion_r2080199141 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java: ## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; +import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class BaseConsumerTest { + +public static final AtomicInteger UPDATE_PRODUCER_COUNT = new AtomicInteger(); +public static final AtomicInteger UPDATE_CONSUMER_COUNT = new AtomicInteger(); +public static final int BROKER_COUNT = 3; + +public static class Testcase { + +public static final String TOPIC = "topic"; +public static final TopicPartition TP = new TopicPartition(TOPIC, 0); + +public static void testSimpleConsumption( +ClusterInstance cluster, +Map config +) throws InterruptedException { +var numRecords = 1; +var startingTimestamp = System.currentTimeMillis(); +sendRecords(cluster, TP, numRecords, startingTimestamp); +try (Consumer consumer = cluster.consumer(config)) { +assertEquals(0, consumer.assignment().size()); +consumer.assign(List.of(TP)); +assertEquals(1, consumer.assignment().size()); + +consumer.seek(TP, 0); +consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); +// check async commit callbacks +sendAndAwaitAsyncCommit(consumer, Optional.empty()); +} +} + +public static void testClusterResourceListener( +ClusterInstance cluster, +Map consumerConfig +) throws InterruptedException { +var numRecords = 100; +Map producerConfig = Map.of( +KEY_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class, +VALUE_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class +); +Map consumerConfigOverrides = new HashMap<>(consumerConfig); +consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); +consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); Review Comment: Don't get it. I try `Map.of` on my local machine and it works just fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, pl
Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]
FrankYang0529 commented on code in PR #19651: URL: https://github.com/apache/kafka/pull/19651#discussion_r2079722269 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java: ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; + +import org.junit.jupiter.api.BeforeEach; + +import java.util.Locale; +import java.util.Map; + +import static org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.BaseConsumerTest.Testcase.testClusterResourceListener; +import static org.apache.kafka.clients.consumer.BaseConsumerTest.Testcase.testCoordinatorFailover; +import static org.apache.kafka.clients.consumer.BaseConsumerTest.Testcase.testSimpleConsumption; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.consumer.SaslPlainPlaintextConsumerTest.MECHANISMS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; +import static org.apache.kafka.common.config.internals.BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG; +import static org.apache.kafka.common.config.internals.BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; + +@ClusterTestDefaults( +types = {Type.KRAFT}, +brokers = BaseConsumerTest.BROKER_COUNT, +serverProperties = { +@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "3"), Review Comment: It looks like default value is 3. Do we need to specify it here? https://github.com/apache/kafka/blob/98e535b5240beccba069ae10dc6692d1b842e9ac/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java#L98-L99 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]
m1a2st commented on code in PR #19651: URL: https://github.com/apache/kafka/pull/19651#discussion_r2079548501 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java: ## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; +import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class BaseConsumerTest { + +public static final AtomicInteger UPDATE_PRODUCER_COUNT = new AtomicInteger(); +public static final AtomicInteger UPDATE_CONSUMER_COUNT = new AtomicInteger(); +public static final int BROKER_COUNT = 3; + +public static class Testcase { + +public static final String TOPIC = "topic"; +public static final TopicPartition TP = new TopicPartition(TOPIC, 0); + +public static void testSimpleConsumption( +ClusterInstance cluster, +Map config +) throws InterruptedException { +var numRecords = 1; +var startingTimestamp = System.currentTimeMillis(); +sendRecords(cluster, TP, numRecords, startingTimestamp); +try (Consumer consumer = cluster.consumer(config)) { +assertEquals(0, consumer.assignment().size()); +consumer.assign(List.of(TP)); +assertEquals(1, consumer.assignment().size()); + +consumer.seek(TP, 0); +consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); +// check async commit callbacks +sendAndAwaitAsyncCommit(consumer, Optional.empty()); +} +} + +public static void testClusterResourceListener( +ClusterInstance cluster, +Map consumerConfig +) throws InterruptedException { +var numRecords = 100; +Map producerConfig = Map.of( +KEY_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class, +VALUE_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class +); +Map consumerConfigOverrides = new HashMap<>(consumerConfig); +consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); +consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); Review Comment: Need to override `consumerConfig`, thus can not use `Map.of` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infras
Re: [PR] KAFKA-19042: [8/N] Move BaseConsumerTest, SaslPlainPlaintextConsumerTest to client-integration-tests module [kafka]
frankvicky commented on code in PR #19651: URL: https://github.com/apache/kafka/pull/19651#discussion_r2079140399 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/BaseConsumerTest.java: ## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; +import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class BaseConsumerTest { + +public static final AtomicInteger UPDATE_PRODUCER_COUNT = new AtomicInteger(); +public static final AtomicInteger UPDATE_CONSUMER_COUNT = new AtomicInteger(); +public static final int BROKER_COUNT = 3; + +public static class Testcase { + +public static final String TOPIC = "topic"; +public static final TopicPartition TP = new TopicPartition(TOPIC, 0); + +public static void testSimpleConsumption( +ClusterInstance cluster, +Map config +) throws InterruptedException { +var numRecords = 1; +var startingTimestamp = System.currentTimeMillis(); +sendRecords(cluster, TP, numRecords, startingTimestamp); +try (Consumer consumer = cluster.consumer(config)) { +assertEquals(0, consumer.assignment().size()); +consumer.assign(List.of(TP)); +assertEquals(1, consumer.assignment().size()); + +consumer.seek(TP, 0); +consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); +// check async commit callbacks +sendAndAwaitAsyncCommit(consumer, Optional.empty()); +} +} + +public static void testClusterResourceListener( +ClusterInstance cluster, +Map consumerConfig +) throws InterruptedException { +var numRecords = 100; +Map producerConfig = Map.of( +KEY_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class, +VALUE_SERIALIZER_CLASS_CONFIG, TestClusterResourceListenerSerializer.class +); +Map consumerConfigOverrides = new HashMap<>(consumerConfig); +consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); +consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, TestClusterResourceListenerDeserializer.class); Review Comment: `Map.of` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org