dajac commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1381644928
########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -695,17 +696,16 @@ else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null) return newConfigs; } - boolean maybeOverrideEnableAutoCommit() { + private void maybeOverrideEnableAutoCommit(Map<String, Object> configs) { Optional<String> groupId = Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG)); - boolean enableAutoCommit = getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + boolean enableAutoCommit = values().containsKey(ENABLE_AUTO_COMMIT_CONFIG) ? getBoolean(ENABLE_AUTO_COMMIT_CONFIG) : false; Review Comment: For my understanding, why do we need to check `values().containsKey(ENABLE_AUTO_COMMIT_CONFIG)`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.Deserializer; + +/** + * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the + * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer} + * can remain the top-level facade for implementations, but allow different implementations to co-exist under + * the covers. + * + * <p/> + * + * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if + * it is using the new KIP-848 consumer protocol or if it should fall back to the existing, legacy group protocol. + * This is based on the presence and value of the {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG group.protocol} + * configuration. If the value is present and equal to "{@code consumer}", the {@link AsyncKafkaConsumer} + * will be returned. Otherwise, the {@link LegacyKafkaConsumer} will be returned. + * + * <p/> + * + * This is not to be called by end users and callers should not attempt to determine the underlying implementation + * as this will make such code very brittle. Users of this facility should honor the top-level {@link Consumer} API + * contract as-is. + */ +public class ConsumerDelegateCreator { + + public <K, V> Consumer<K, V> create(ConsumerConfig config, + Deserializer<K> keyDeserializer, + Deserializer<V> valueDeserializer) { + Review Comment: nit: We could remove this empty line. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -278,22 +260,22 @@ public PrototypeAsyncConsumer(final Time time, } } - public PrototypeAsyncConsumer(LogContext logContext, - String clientId, - Deserializers<K, V> deserializers, - FetchBuffer fetchBuffer, - FetchCollector<K, V> fetchCollector, - ConsumerInterceptors<K, V> interceptors, - Time time, - ApplicationEventHandler applicationEventHandler, - BlockingQueue<BackgroundEvent> backgroundEventQueue, - Metrics metrics, - SubscriptionState subscriptions, - ConsumerMetadata metadata, - long retryBackoffMs, - int defaultApiTimeoutMs, - List<ConsumerPartitionAssignor> assignors, - String groupId) { + public AsyncKafkaConsumer(LogContext logContext, Review Comment: Do we need to keep this one public? ########## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java: ########## @@ -66,18 +66,24 @@ public void testOverrideClientId() { @Test public void testOverrideEnableAutoCommit() { - ConsumerConfig config = new ConsumerConfig(properties); - boolean overrideEnableAutoCommit = config.maybeOverrideEnableAutoCommit(); - assertFalse(overrideEnableAutoCommit); - - properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - config = new ConsumerConfig(properties); - try { - config.maybeOverrideEnableAutoCommit(); - fail("Should have thrown an exception"); - } catch (InvalidConfigurationException e) { - // expected - } + // Verify that our default properties (no 'enable.auto.commit' or 'group.id') are valid. + assertDoesNotThrow(() -> new ConsumerConfig(properties)); + + // Verify that explicitly disabling 'enable.auto.commit' still works. + properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE.toString()); + assertDoesNotThrow(() -> new ConsumerConfig(properties)); + + // Verify that enabling 'enable.auto.commit' but without 'group.id' fails. + properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE.toString()); + assertThrows(InvalidConfigurationException.class, () -> new ConsumerConfig(properties)); + + // Verify that then adding 'group.id' to the mix allows it to pass OK. + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + assertDoesNotThrow(() -> new ConsumerConfig(properties)); Review Comment: nit: ditto. ########## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java: ########## @@ -66,18 +66,24 @@ public void testOverrideClientId() { @Test public void testOverrideEnableAutoCommit() { - ConsumerConfig config = new ConsumerConfig(properties); - boolean overrideEnableAutoCommit = config.maybeOverrideEnableAutoCommit(); - assertFalse(overrideEnableAutoCommit); - - properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - config = new ConsumerConfig(properties); - try { - config.maybeOverrideEnableAutoCommit(); - fail("Should have thrown an exception"); - } catch (InvalidConfigurationException e) { - // expected - } + // Verify that our default properties (no 'enable.auto.commit' or 'group.id') are valid. + assertDoesNotThrow(() -> new ConsumerConfig(properties)); + + // Verify that explicitly disabling 'enable.auto.commit' still works. + properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE.toString()); + assertDoesNotThrow(() -> new ConsumerConfig(properties)); Review Comment: nit: Would it be better to actually assert the value of the property after the ConsumerConfig is created? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumerTest.java: ########## @@ -164,7 +163,7 @@ * Note to future authors in this class. If you close the consumer, close with DURATION.ZERO to reduce the duration of * the test. */ -public class KafkaConsumerTest { +public class LegacyKafkaConsumerTest { Review Comment: Are you referring to `AsyncKafkaConsumerTest`? I wonder if we should try a bit more to keep this one for both implementation. In this PR, we could perhaps keep it as `KafkaConsumerTest` and configure it to only run against the legacy consumer and we could parameterised it further as a follow-up. Without `KafkaConsumerTest`, we don't have any other tests validating the main "interface" and there is a ton of tests in this one. ########## tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java: ########## @@ -430,8 +430,9 @@ public void testConsume(final long prodTimeMs) throws Throwable { () -> log.info("offsetsForTime = {}", offsetsForTime.result)); // Whether or not offsetsForTimes works, beginningOffsets and endOffsets // should work. - consumer.beginningOffsets(timestampsToSearch.keySet()); - consumer.endOffsets(timestampsToSearch.keySet()); + Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(timestampsToSearch.keySet()); + Map<TopicPartition, Long> endingOffsets = consumer.endOffsets(timestampsToSearch.keySet()); + log.trace("beginningOffsets: {}, endingOffsets: {}", beginningOffsets, endingOffsets); Review Comment: It would be great if we could revert this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org