Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-15 Thread via GitHub


dajac merged PR #14670:
URL: https://github.com/apache/kafka/pull/14670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-15 Thread via GitHub


dajac commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1812496898

   I had a look at all the failed tests from the last build and they look all 
unrelated to me. Therefore, I am going to merge this PR to trunk. Thanks 
@kirktrue for the patch!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-14 Thread via GitHub


kirktrue closed pull request #14670: KAFKA-15277: Design & implement support 
for internal Consumer delegates
URL: https://github.com/apache/kafka/pull/14670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1811579631

   Update on test failures:
   
   - `kafka.api.ClientIdQuotaTest`
 - `testQuotaOverrideDelete`: ran 50 times locally with no errors
 - `testThrottledProducerConsumer`: ran 75 times locally with no errors
   - `org.apache.kafka.connect.integration.StartAndStopLatchTest`
 - `shouldReturnFalseWhenAwaitingForStopToNeverComplete`: ran 400 times 
locally with no errors
   
   Closing and re-opening to trigger build...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1811460391

   @dajac—here are the unique test failures with notes:
   
   - integration.kafka.server.FetchFromFollowerIntegrationTest: 
testRackAwareRangeAssignor(String).quorum=kraft
 - KAFKA-15020
   - integration.kafka.server.FetchFromFollowerIntegrationTest: 
testRackAwareRangeAssignor(String).quorum=zk
 - KAFKA-15020, same as above
   - kafka.api.ClientIdQuotaTest: testQuotaOverrideDelete(String).quorum=kraft
 - KAFKA-8107
 - **_Will investigate_**
   - kafka.api.ClientIdQuotaTest: 
testThrottledProducerConsumer(String).quorum=zk
 - KAFKA-8108
 - **_Will investigate_**
   - kafka.api.GroupEndToEndAuthorizationTest: 
testAuthentications(String).quorum=kraft
 - No Jira, but [it is flaky on `trunk` and PR 
builds](https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=kafka.api.GroupEndToEndAuthorizationTest=testAuthentications(String)%5B2%5D)
  
   - kafka.api.TransactionsTest: testBumpTransactionalEpoch(String).quorum=kraft
 - KAFKA-15099
   - org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest: 
testMultiWorkerRestartOnlyConnector
 - KAFKA-15675
   - org.apache.kafka.connect.integration.StartAndStopLatchTest: 
shouldReturnFalseWhenAwaitingForStopToNeverComplete
 - **_Will investigate_**
   - 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest:
 testReplication()
 - No Jira, but [it is flaky on `trunk` and PR 
builds](https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest=testReplication())
   - org.apache.kafka.controller.QuorumControllerTest: 
testBalancePartitionLeaders()
 - KAFKA-15052, marked as fixed
   - org.apache.kafka.controller.QuorumControllerTest: 
testFenceMultipleBrokers()
 - No Jira, but [it is flaky on `trunk` and PR 
builds](https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=org.apache.kafka.controller.QuorumControllerTest=testFenceMultipleBrokers())
   - 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest:
 testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs()
 - No Jira, but [it is flaky on `trunk` and PR 
builds](https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest=testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs())
   - org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest: 
shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2]
 - KAFKA-9868, marked as fixed
   - org.apache.kafka.streams.integration.IQv2StoreIntegrationTest; 
verifyStore[cache=true, log=true, supplier=TIME_ROCKS_WINDOW, kind=DSL]
 - KAFKA-13714, marked as fixed
   - org.apache.kafka.streams.integration.RestoreIntegrationTest: 
shouldInvokeUserDefinedGlobalStateRestoreListener()
 - KAFKA-15659, marked as fixed
   - org.apache.kafka.tools.MetadataQuorumCommandTest: [1] Type=Raft-Combined, 
Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.7-IV1, 
Security=PLAINTEXT
 - KAFKA-15104
   - org.apache.kafka.trogdor.coordinator.CoordinatorTest: 
testTaskRequestWithOldStartMsGetsUpdated()
 - KAFKA-8115


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-13 Thread via GitHub


kirktrue closed pull request #14670: KAFKA-15277: Design & implement support 
for internal Consumer delegates
URL: https://github.com/apache/kafka/pull/14670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-13 Thread via GitHub


kirktrue commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1808930992

   > re-triggered a build as the last one did not look good.
   
   I only just now updated my branch to include the revert commits, so I 
wouldn't necessarily expect that test run to successfully pass. I have started 
yet another build with the previous reverts and will keep an eye on the results.
   
   Thanks @dajac!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-13 Thread via GitHub


dajac commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1808636945

   re-triggered a build as the last one did not look good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-10 Thread via GitHub


kirktrue closed pull request #14670: KAFKA-15277: Design & implement support 
for internal Consumer delegates
URL: https://github.com/apache/kafka/pull/14670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-10 Thread via GitHub


kirktrue closed pull request #14670: KAFKA-15277: Design & implement support 
for internal Consumer delegates
URL: https://github.com/apache/kafka/pull/14670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-10 Thread via GitHub


kirktrue commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1805940138

   Re-triggered a build due to the abundance of seemingly unrelated test 
failures.
   
   For example, the first line of this test in `ClusterConnectionStatesTest` is 
failing across all builds because `localhost` is resolving to more than one IP 
address:
   
   ```java
   @Test
   public void testSingleIP() throws UnknownHostException {
   assertEquals(1, ClientUtils.resolve("localhost", 
singleIPHostResolver).size());
   
   connectionStates.connecting(nodeId1, time.milliseconds(), "localhost");
   InetAddress currAddress = connectionStates.currentAddress(nodeId1);
   connectionStates.connecting(nodeId1, time.milliseconds(), "localhost");
   assertSame(currAddress, connectionStates.currentAddress(nodeId1));
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-10 Thread via GitHub


kirktrue closed pull request #14670: KAFKA-15277: Design & implement support 
for internal Consumer delegates
URL: https://github.com/apache/kafka/pull/14670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-09 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1388693648


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+import static org.apache.kafka.common.utils.Utils.isBlank;
+import static 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-09 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1388662985


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * This extension interface provides a handful of methods to expose internals 
of the {@link Consumer} for
+ * various tests.
+ */
+public interface ConsumerDelegate extends Consumer {
+
+String getClientId();
+
+Metrics metricsInternal();

Review Comment:
   Renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-09 Thread via GitHub


dajac commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1388397862


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * This extension interface provides a handful of methods to expose internals 
of the {@link Consumer} for
+ * various tests.
+ */
+public interface ConsumerDelegate extends Consumer {
+
+String getClientId();
+
+Metrics metricsInternal();

Review Comment:
   Ack. Let's keep it then. How about `metricsRegistry()`? I leave the name up 
to you. I don't have a strong opinion on it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-09 Thread via GitHub


dajac commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1804311687

   > > Should we add a small comment to those to explain that this is not 
supported in the new protocol?
   > 
   > There is a comment above the three tests that use `enforceRebalance()` 
that states:
   > 
   > ```java
   > // NOTE: this test uses the enforceRebalance API which is not implemented 
in the CONSUMER group protocol.
   > ```
   > 
   > Is that sufficient?
   
   Yes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


kirktrue commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1802834941

   > Should we add a small comment to those to explain that this is not 
supported in the new protocol?
   
   There is a comment above the three tests that use `enforceRebalance()` that 
states:
   
   ```java
   // NOTE: this test uses the enforceRebalance API which is not implemented in 
the CONSUMER group protocol.
   ```
   
   Is that sufficient?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1387286879


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -222,42 +214,74 @@ public void cleanup() {
 }
 }
 
-@Test
-public void testMetricsReporterAutoGeneratedClientId() {
+private static Collection bothGroupProtocols() {
+return 
Arrays.stream(GroupProtocol.values()).map(Arguments::of).collect(Collectors.toList());
+}
+
+/**
+ * A given test may choose to use the {@link GroupProtocol#GENERIC generic 
group protocol} for a number of reasons.
+ * Among the reasons for a test to do so is because it...
+ *
+ * 
+ * 
+ * ...exercises rebalancing logic that is not yet implemented in 
the
+ * {@link GroupProtocol#CONSUMER consumer group protocol}.
+ * 
+ * ...includes topic metadata that is not yet implemented in the 
consumer group protocol.
+ * ...fails, possibly due to the omission of functionality in the 
consumer group protocol.
+ * ...uses logic, timing, etc. that are not applicable to the 
consumer group protocol.
+ * 
+ *
+ * Less than half of the tests for the consumer group protocol pass as of 
now, but it's very tedious to
+ * investigate at this point due to known bugs and missing functionality.
+ */

Review Comment:
   > It would be better to group them somehow
   
   Sounds good. I'll start the investigation and open Jiras as I find new 
causes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


dajac commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1802394898

   > 3 reference the enforceRebalance() that the CONSUMER group protocol 
intentionally doesn't implement
   
   Should we add a small comment to those to explain that this is not supported 
in the new protocol?
   
   > I'm still in a quandary about why they're now showing up. I'll continue 
digging.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


dajac commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1387013819


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -222,42 +214,74 @@ public void cleanup() {
 }
 }
 
-@Test
-public void testMetricsReporterAutoGeneratedClientId() {
+private static Collection bothGroupProtocols() {
+return 
Arrays.stream(GroupProtocol.values()).map(Arguments::of).collect(Collectors.toList());
+}
+
+/**
+ * A given test may choose to use the {@link GroupProtocol#GENERIC generic 
group protocol} for a number of reasons.
+ * Among the reasons for a test to do so is because it...
+ *
+ * 
+ * 
+ * ...exercises rebalancing logic that is not yet implemented in 
the
+ * {@link GroupProtocol#CONSUMER consumer group protocol}.
+ * 
+ * ...includes topic metadata that is not yet implemented in the 
consumer group protocol.
+ * ...fails, possibly due to the omission of functionality in the 
consumer group protocol.
+ * ...uses logic, timing, etc. that are not applicable to the 
consumer group protocol.
+ * 
+ *
+ * Less than half of the tests for the consumer group protocol pass as of 
now, but it's very tedious to
+ * investigate at this point due to known bugs and missing functionality.
+ */

Review Comment:
   @kirktrue It would be better to group them somehow but I let you judge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


kirktrue commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1802288411

   > @kirktrue The build is red. Could you please take a look?
   
   The build failures are due to the recent "Dodgy code Warnings" that SpotBugs 
is suddenly issuing.
   
   SpotBugs reports three violations of the 
`RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT` check because we're ignoring the 
return value of `Consumer` method calls in three instances.
   
   * Two of the instances are in `ClientCompatibilityTest`'s `testConsume` 
method because it ignores the return value of `beginningOffsets()` and 
`endOffsets()`
   * The third instances is in `StreamsResetter`'s 
`maybeResetInputAndSeekToEndIntermediateTopicOffsets` because it doesn't use 
the value that `position()` returns
   
   I'm still in a quandary about why they're now showing up. I'll continue 
digging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1386913209


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -1893,8 +2014,9 @@ private void consumerCloseTest(final long closeTimeoutMs,
 }
 }
 
-@Test
-public void testPartitionsForNonExistingTopic() {
+@ParameterizedTest
+@MethodSource("genericGroupProtocolOnly")
+public void testPartitionsForNonExistingTopic(GroupProtocol groupProtocol) 
{

Review Comment:
   Most, yes. There are some `enforceRebalance()` tests which—as I understand 
it—don't make sense to "port" over to the new group protocol.



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -1893,8 +2014,9 @@ private void consumerCloseTest(final long closeTimeoutMs,
 }
 }
 
-@Test
-public void testPartitionsForNonExistingTopic() {
+@ParameterizedTest
+@MethodSource("genericGroupProtocolOnly")
+public void testPartitionsForNonExistingTopic(GroupProtocol groupProtocol) 
{

Review Comment:
   Most, yes. There are some `enforceRebalance()` tests which—as I understand 
it—don't make sense to "port" over to the new group protocol.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1386911868


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -835,16 +919,17 @@ public void testMissingOffsetNoResetPolicy() {
 assertThrows(NoOffsetForPartitionException.class, () -> 
consumer.poll(Duration.ZERO));
 }
 
-@Test
-public void testResetToCommittedOffset() {
+@ParameterizedTest
+@MethodSource("genericGroupProtocolOnly")

Review Comment:
   Resolving this conversation as it is being handled in a PR-level 
conversation.



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -814,16 +897,17 @@ private void initMetadata(MockClient mockClient, 
Map partitionC
 mockClient.updateMetadata(initialMetadata);
 }
 
-@Test
-public void testMissingOffsetNoResetPolicy() {
+@ParameterizedTest
+@MethodSource("genericGroupProtocolOnly")

Review Comment:
   Resolving this conversation as it is being handled in a PR-level 
conversation.



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -857,16 +942,17 @@ public void testResetToCommittedOffset() {
 assertEquals(539L, consumer.position(tp0));
 }
 
-@Test
-public void testResetUsingAutoResetPolicy() {
+@ParameterizedTest
+@MethodSource("genericGroupProtocolOnly")

Review Comment:
   Resolving this conversation as it is being handled in a PR-level 
conversation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1386911536


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -721,16 +802,17 @@ public void 
verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
 assertEquals(55L, consumer.position(tp0));
 }
 
-@Test
-public void verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit() 
{
+@ParameterizedTest
+@MethodSource("genericGroupProtocolOnly")

Review Comment:
   Resolving this conversation as it is being handled in a PR-level 
conversation.



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -757,8 +839,9 @@ public void 
verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit() {
 assertEquals(55, consumer.committed(Collections.singleton(tp0), 
Duration.ZERO).get(tp0).offset());
 }
 
-@Test
-public void testFetchProgressWithMissingPartitionPosition() {
+@ParameterizedTest
+@MethodSource("genericGroupProtocolOnly")

Review Comment:
   Resolving this conversation as it is being handled in a PR-level 
conversation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1386912274


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -881,24 +967,26 @@ public void testResetUsingAutoResetPolicy() {
 assertEquals(50L, consumer.position(tp0));
 }
 
-@Test
-public void testOffsetIsValidAfterSeek() {
+@ParameterizedTest
+@MethodSource("bothGroupProtocols")
+public void testOffsetIsValidAfterSeek(GroupProtocol groupProtocol) {
 SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.LATEST);
 ConsumerMetadata metadata = createMetadata(subscription);
 MockClient client = new MockClient(time, metadata);
 
 initMetadata(client, Collections.singletonMap(topic, 1));
 
-consumer = newConsumer(time, client, subscription, metadata, assignor,
+consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor,
 true, groupId, Optional.empty(), false);
 consumer.assign(singletonList(tp0));
 consumer.seek(tp0, 20L);
 consumer.poll(Duration.ZERO);
 assertEquals(subscription.validPosition(tp0).offset, 20L);
 }
 
-@Test
-public void testCommitsFetchedDuringAssign() {
+@ParameterizedTest
+@MethodSource("genericGroupProtocolOnly")

Review Comment:
   Resolving this conversation as it is being handled in a PR-level 
conversation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1386910396


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -222,42 +214,74 @@ public void cleanup() {
 }
 }
 
-@Test
-public void testMetricsReporterAutoGeneratedClientId() {
+private static Collection bothGroupProtocols() {
+return 
Arrays.stream(GroupProtocol.values()).map(Arguments::of).collect(Collectors.toList());
+}
+
+/**
+ * A given test may choose to use the {@link GroupProtocol#GENERIC generic 
group protocol} for a number of reasons.
+ * Among the reasons for a test to do so is because it...
+ *
+ * 
+ * 
+ * ...exercises rebalancing logic that is not yet implemented in 
the
+ * {@link GroupProtocol#CONSUMER consumer group protocol}.
+ * 
+ * ...includes topic metadata that is not yet implemented in the 
consumer group protocol.
+ * ...fails, possibly due to the omission of functionality in the 
consumer group protocol.
+ * ...uses logic, timing, etc. that are not applicable to the 
consumer group protocol.
+ * 
+ *
+ * Less than half of the tests for the consumer group protocol pass as of 
now, but it's very tedious to
+ * investigate at this point due to known bugs and missing functionality.
+ */

Review Comment:
   @dajac—I've made the change to use `EnumSource` everywhere. Thanks for that 
suggestion 
   
   As far as the logistics for tracking these ~29 bugs via Jiras—do you suggest 
we file separate Jiras for each test failure, an "uber" Jira for all of the 
failures, or something else?



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -222,42 +214,74 @@ public void cleanup() {
 }
 }
 
-@Test
-public void testMetricsReporterAutoGeneratedClientId() {
+private static Collection bothGroupProtocols() {
+return 
Arrays.stream(GroupProtocol.values()).map(Arguments::of).collect(Collectors.toList());
+}
+
+/**
+ * A given test may choose to use the {@link GroupProtocol#GENERIC generic 
group protocol} for a number of reasons.
+ * Among the reasons for a test to do so is because it...
+ *
+ * 
+ * 
+ * ...exercises rebalancing logic that is not yet implemented in 
the
+ * {@link GroupProtocol#CONSUMER consumer group protocol}.
+ * 
+ * ...includes topic metadata that is not yet implemented in the 
consumer group protocol.
+ * ...fails, possibly due to the omission of functionality in the 
consumer group protocol.
+ * ...uses logic, timing, etc. that are not applicable to the 
consumer group protocol.
+ * 
+ *
+ * Less than half of the tests for the consumer group protocol pass as of 
now, but it's very tedious to
+ * investigate at this point due to known bugs and missing functionality.
+ */

Review Comment:
   @dajac—I've made the change to use `EnumSource` everywhere. Thanks for that 
suggestion 
   
   As far as the logistics for tracking these ~29 bugs via Jiras—do you suggest 
we file separate Jiras for each test failure, an "uber" Jira for all of the 
failures, or something else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1386907003


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * This extension interface provides a handful of methods to expose internals 
of the {@link Consumer} for
+ * various tests.
+ */
+public interface ConsumerDelegate extends Consumer {
+
+String getClientId();
+
+Metrics metricsInternal();

Review Comment:
   It doesn't seem like this will be trivial to change the references to 
`Metrics` entirely. There are a couple of cases, but several tests use the 
'metrics reporters' that the `Metrics` class exposes. As such, we would need to 
keep something like `metricsInternal()` around.
   
   Would it help to change the method name from `metricsInternal()` to 
something else? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1386903933


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##
@@ -211,4 +226,17 @@ else if (t instanceof KafkaException)
 throw new TimeoutException(e);
 }
 }
+
+public static boolean maybeOverrideEnableAutoCommit(ConsumerConfig config) 
{

Review Comment:
   @dajac—following up on this—are you OK with this change?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##
@@ -211,4 +226,17 @@ else if (t instanceof KafkaException)
 throw new TimeoutException(e);
 }
 }
+
+public static boolean maybeOverrideEnableAutoCommit(ConsumerConfig config) 
{

Review Comment:
   @dajac—following up on this—are you OK with this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


kirktrue commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1802247627

   An update on the `KafkaConsumerTest`—
   
   Of the 100 tests, 39 successfully pass using both group protocols. 61 have 
failures when using the `CONSUMER` group protocol, so they have been hardcoded 
to only use `GENERIC` for now. Of those 61 tests:
   
   - 29 trigger bugs when using the new `AsyncKafkaConsumer` (AKA CTR consumer) 
that the `CONSUMER` group protocol leverages 
   - 24 reference rebalance logic that is currently unimplemented in the 
`CONSUMER` group protocol
   - 2 reference topic metadata logic that is currently unimplemented in the 
`CONSUMER` group protocol
   - 3 reference RPCs that are used only by the `GENERIC` group protocol
   - 3 reference the `enforceRebalance()` that the `CONSUMER` group protocol 
intentionally doesn't implement
   
   It will be one of my top priorities to circle back to the 'bugs' to 
investigate and fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1386903370


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig {
  */
 static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
"internal.leave.group.on.close";
 
-/**
- * internal.throw.on.fetch.stable.offset.unsupported
- * Whether or not the consumer should throw when the new stable offset 
feature is supported.
- * If set to true then the client shall crash upon hitting it.
- * The purpose of this flag is to prevent unexpected broker downgrade 
which makes
- * the offset fetch protection against pending commit invalid. The safest 
approach
- * is to fail fast to avoid introducing correctness issue.
- *
- * 
- * Note: this is an internal configuration and could be changed in the 
future in a backward incompatible way
- *
- */
-static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = 
"internal.throw.on.fetch.stable.offset.unsupported";

Review Comment:
   @dajac—just following up on this point, specifically. Are you OK with this 
change? It's still awkward, I know, so if you have other ideas, let me know. 
Thanks!



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig {
  */
 static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
"internal.leave.group.on.close";
 
-/**
- * internal.throw.on.fetch.stable.offset.unsupported
- * Whether or not the consumer should throw when the new stable offset 
feature is supported.
- * If set to true then the client shall crash upon hitting it.
- * The purpose of this flag is to prevent unexpected broker downgrade 
which makes
- * the offset fetch protection against pending commit invalid. The safest 
approach
- * is to fail fast to avoid introducing correctness issue.
- *
- * 
- * Note: this is an internal configuration and could be changed in the 
future in a backward incompatible way
- *
- */
-static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = 
"internal.throw.on.fetch.stable.offset.unsupported";

Review Comment:
   @dajac—just following up on this point, specifically. Are you OK with this 
change? It's still awkward, I know, so if you have other ideas, let me know. 
Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


dajac commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1801373056

   @kirktrue The build is red. Could you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


dajac commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1386239113


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -222,42 +214,74 @@ public void cleanup() {
 }
 }
 
-@Test
-public void testMetricsReporterAutoGeneratedClientId() {
+private static Collection bothGroupProtocols() {
+return 
Arrays.stream(GroupProtocol.values()).map(Arguments::of).collect(Collectors.toList());
+}
+
+/**
+ * A given test may choose to use the {@link GroupProtocol#GENERIC generic 
group protocol} for a number of reasons.
+ * Among the reasons for a test to do so is because it...
+ *
+ * 
+ * 
+ * ...exercises rebalancing logic that is not yet implemented in 
the
+ * {@link GroupProtocol#CONSUMER consumer group protocol}.
+ * 
+ * ...includes topic metadata that is not yet implemented in the 
consumer group protocol.
+ * ...fails, possibly due to the omission of functionality in the 
consumer group protocol.
+ * ...uses logic, timing, etc. that are not applicable to the 
consumer group protocol.
+ * 
+ *
+ * Less than half of the tests for the consumer group protocol pass as of 
now, but it's very tedious to
+ * investigate at this point due to known bugs and missing functionality.
+ */

Review Comment:
   Yeah, I would also use `EnumSource` for those ones and I would remove this 
comment. We should rather track the missing parts with Jiras.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


dajac commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1386232830


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -247,12 +247,10 @@ private void closeInternal(final Duration timeout) {
 closeTimeout = timeout;
 wakeup();
 
-if (timeoutMs > 0) {
-try {
-join(timeoutMs);
-} catch (InterruptedException e) {
-log.error("Interrupted while waiting for consumer network 
thread to complete", e);
-}
+try {

Review Comment:
   We can keep it here. I just wanted to understand the reason behind it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


dajac commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1386231835


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * This extension interface provides a handful of methods to expose internals 
of the {@link Consumer} for
+ * various tests.
+ */
+public interface ConsumerDelegate extends Consumer {
+
+String getClientId();
+
+Metrics metricsInternal();

Review Comment:
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-08 Thread via GitHub


dajac commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1801359691

   Thanks for the explanation, @kirktrue. I agree with you that we should not 
block this PR on fixing all those tests. We can continue to fix them in 
subsequent PRs. For the tests only applicable to the old protocol (1), I agree 
that it does not make sense to touch them. However, we may need to add 
equivalent ones for the new protocol, do we?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-07 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1385683761


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -721,16 +802,17 @@ public void 
verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
 assertEquals(55L, consumer.position(tp0));
 }
 
-@Test
-public void verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit() 
{
+@ParameterizedTest
+@MethodSource("genericGroupProtocolOnly")

Review Comment:
   Yes, it should. I spent about half a day attempting to investigate the 
underlying issue. I wasn't sure if we should spend that time in this PR or in 
separate PRs. I sent a separate message to that end, and I'll circle back to 
these later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-07 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1385682384


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -222,42 +214,74 @@ public void cleanup() {
 }
 }
 
-@Test
-public void testMetricsReporterAutoGeneratedClientId() {
+private static Collection bothGroupProtocols() {
+return 
Arrays.stream(GroupProtocol.values()).map(Arguments::of).collect(Collectors.toList());
+}
+
+/**
+ * A given test may choose to use the {@link GroupProtocol#GENERIC generic 
group protocol} for a number of reasons.
+ * Among the reasons for a test to do so is because it...
+ *
+ * 
+ * 
+ * ...exercises rebalancing logic that is not yet implemented in 
the
+ * {@link GroupProtocol#CONSUMER consumer group protocol}.
+ * 
+ * ...includes topic metadata that is not yet implemented in the 
consumer group protocol.
+ * ...fails, possibly due to the omission of functionality in the 
consumer group protocol.
+ * ...uses logic, timing, etc. that are not applicable to the 
consumer group protocol.
+ * 
+ *
+ * Less than half of the tests for the consumer group protocol pass as of 
now, but it's very tedious to
+ * investigate at this point due to known bugs and missing functionality.
+ */

Review Comment:
   Just so I'm clear, you'd like to me to remove this method and then add a 
more specific reason for skipping the new consumer group protocol at each test 
method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-07 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1385680899


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##
@@ -211,4 +226,17 @@ else if (t instanceof KafkaException)
 throw new TimeoutException(e);
 }
 }
+
+public static boolean maybeOverrideEnableAutoCommit(ConsumerConfig config) 
{

Review Comment:
   It has now been changed to override the configuration. LMK if you think that 
is a good idea, or if I should revert it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-07 Thread via GitHub


kirktrue commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1800305244

   `KafkaConsumerTest` is a test suite that exercised the functionality of the 
"existing" `KafkaConsumer` with 100 tests. With this change, the suite was 
refactored to allow each of those tests to be parameterized, such that it now 
tests both `LegacyKafkaConsumer` and `AsyncKafkaConsumer`.
   
   Ideally, those 100 tests would pass for both implementations, and we would 
have 200 passing tests 
   
   However, though the `LegacyKafkaConsumer` still passes all 100 tests in 
`KafkaConsumerTest`, the `AsyncKafkaConsumer` fails on 56 tests. The failures 
can be classified into the following reasons:
   
   1. The test is only applicable for the old group protocol
   2. The rebalancing logic is not yet implemented
   3. The topic metadata logic is not yet implemented
   4. The low-level CTR logic is incorrect
   
   To the first point—some of the tests directly reference RPC calls from the 
old protocol and thus those tests won't pass without some effort in refactoring 
the tests. It may make sense to leave those alone versus bending things around 
to make them apply to the new protocol. As for the second and third points, at 
this point in development, we don't expect tests that exercise rebalance or 
topic metadata logic to work. For the fourth point, we are "gaining" new tests 
that stress the CTR layer in ways that we haven't tickled with our existing 
unit tests or integration tests.
   
   Given this situation, I see two options forward:
   
   1. Fix failing tests as part of this PR
   2. Note which tests are failing, file Jiras, and fix them in future PRs
   
   The first option allows us to merge in the underlying delegate change 
sooner, preventing it from blocking end-to-end testing. The second option is 
more holistic and doesn't attempt to shoehorn a half-baked test suite into 
`trunk`.
   
   I take the view that we gained 44 additional passing tests with this change 
and have a followup task of gaining upward of 56 more soon.
   
   @dajac @AndrewJSchofield @philipnee @lianetm—any opinions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-07 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1385631897


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * This extension interface provides a handful of methods to expose internals 
of the {@link Consumer} for
+ * various tests.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-07 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1385631654


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -222,42 +214,74 @@ public void cleanup() {
 }
 }
 
-@Test
-public void testMetricsReporterAutoGeneratedClientId() {
+private static Collection bothGroupProtocols() {
+return 
Arrays.stream(GroupProtocol.values()).map(Arguments::of).collect(Collectors.toList());
+}
+
+/**
+ * A given test may choose to use the {@link GroupProtocol#GENERIC generic 
group protocol} for a number of reasons.
+ * Among the reasons for a test to do so is because it...
+ *
+ * 
+ * 
+ * ...exercises rebalancing logic that is not yet implemented in 
the
+ * {@link GroupProtocol#CONSUMER consumer group protocol}.
+ * 
+ * ...includes topic metadata that is not yet implemented in the 
consumer group protocol.
+ * ...fails, possibly due to the omission of functionality in the 
consumer group protocol.
+ * ...uses logic, timing, etc. that are not applicable to the 
consumer group protocol.
+ * 
+ *
+ * Less than half of the tests for the consumer group protocol pass as of 
now, but it's very tedious to
+ * investigate at this point due to known bugs and missing functionality.
+ */
+private static Collection genericGroupProtocolOnly() {
+return Collections.singleton(Arguments.of(GroupProtocol.GENERIC));
+}
+
+@ParameterizedTest
+@MethodSource("bothGroupProtocols")

Review Comment:
   Done.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.

Review Comment:
   Done.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-07 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1385620881


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -222,42 +214,74 @@ public void cleanup() {
 }
 }
 
-@Test
-public void testMetricsReporterAutoGeneratedClientId() {
+private static Collection bothGroupProtocols() {
+return 
Arrays.stream(GroupProtocol.values()).map(Arguments::of).collect(Collectors.toList());
+}
+
+/**
+ * A given test may choose to use the {@link GroupProtocol#GENERIC generic 
group protocol} for a number of reasons.
+ * Among the reasons for a test to do so is because it...
+ *
+ * 
+ * 
+ * ...exercises rebalancing logic that is not yet implemented in 
the
+ * {@link GroupProtocol#CONSUMER consumer group protocol}.
+ * 
+ * ...includes topic metadata that is not yet implemented in the 
consumer group protocol.
+ * ...fails, possibly due to the omission of functionality in the 
consumer group protocol.
+ * ...uses logic, timing, etc. that are not applicable to the 
consumer group protocol.
+ * 
+ *
+ * Less than half of the tests for the consumer group protocol pass as of 
now, but it's very tedious to
+ * investigate at this point due to known bugs and missing functionality.
+ */
+private static Collection genericGroupProtocolOnly() {
+return Collections.singleton(Arguments.of(GroupProtocol.GENERIC));
+}
+
+@ParameterizedTest
+@MethodSource("bothGroupProtocols")

Review Comment:
   Because I didn't know of its existence 
   
   I'll make that change.



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -222,42 +214,74 @@ public void cleanup() {
 }
 }
 
-@Test
-public void testMetricsReporterAutoGeneratedClientId() {
+private static Collection bothGroupProtocols() {
+return 
Arrays.stream(GroupProtocol.values()).map(Arguments::of).collect(Collectors.toList());
+}
+
+/**
+ * A given test may choose to use the {@link GroupProtocol#GENERIC generic 
group protocol} for a number of reasons.
+ * Among the reasons for a test to do so is because it...
+ *
+ * 
+ * 
+ * ...exercises rebalancing logic that is not yet implemented in 
the
+ * {@link GroupProtocol#CONSUMER consumer group protocol}.
+ * 
+ * ...includes topic metadata that is not yet implemented in the 
consumer group protocol.
+ * ...fails, possibly due to the omission of functionality in the 
consumer group protocol.
+ * ...uses logic, timing, etc. that are not applicable to the 
consumer group protocol.
+ * 
+ *
+ * Less than half of the tests for the consumer group protocol pass as of 
now, but it's very tedious to
+ * investigate at this point due to known bugs and missing functionality.
+ */
+private static Collection genericGroupProtocolOnly() {
+return Collections.singleton(Arguments.of(GroupProtocol.GENERIC));
+}
+
+@ParameterizedTest
+@MethodSource("bothGroupProtocols")

Review Comment:
   Because I didn't know of its existence 
   
   I'll make that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-07 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1385620217


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -247,12 +247,10 @@ private void closeInternal(final Duration timeout) {
 closeTimeout = timeout;
 wakeup();
 
-if (timeoutMs > 0) {
-try {
-join(timeoutMs);
-} catch (InterruptedException e) {
-log.error("Interrupted while waiting for consumer network 
thread to complete", e);
-}
+try {

Review Comment:
   Because one of the unit tests fails without it 
   
   In the existing `KafkaConsumer.close()`, the coordinator eventually calls 
this method:
   
   ```java
   private void closeHeartbeatThread() {
   HeartbeatThread thread;
   synchronized (this) {
   if (heartbeatThread == null)
   return;
   heartbeatThread.close();
   thread = heartbeatThread;
   heartbeatThread = null;
   }
   try {
   thread.join();
   } catch (InterruptedException e) {
   log.warn("Interrupted while waiting for consumer heartbeat 
thread to close");
   throw new InterruptException(e);
   }
   }
   ```
   
   By blocking via `Thread.join()`, it allows the coordinator to close fully 
before its underlying resources (client, etc.) are closed. In contrast, our 
background thread was only waiting up to the timeout, and then proceeded with 
closing the other resources even though the background thread may not have 
completed its cleanup.
   
   If you'd prefer, I can save this change for a separate PR?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -247,12 +247,10 @@ private void closeInternal(final Duration timeout) {
 closeTimeout = timeout;
 wakeup();
 
-if (timeoutMs > 0) {
-try {
-join(timeoutMs);
-} catch (InterruptedException e) {
-log.error("Interrupted while waiting for consumer network 
thread to complete", e);
-}
+try {

Review Comment:
   Because one of the unit tests fails without it 
   
   In the existing `KafkaConsumer.close()`, the coordinator eventually calls 
this method:
   
   ```java
   private void closeHeartbeatThread() {
   HeartbeatThread thread;
   synchronized (this) {
   if (heartbeatThread == null)
   return;
   heartbeatThread.close();
   thread = heartbeatThread;
   heartbeatThread = null;
   }
   try {
   thread.join();
   } catch (InterruptedException e) {
   log.warn("Interrupted while waiting for consumer heartbeat 
thread to close");
   throw new InterruptException(e);
   }
   }
   ```
   
   By blocking via `Thread.join()`, it allows the coordinator to close fully 
before its underlying resources (client, etc.) are closed. In contrast, our 
background thread was only waiting up to the timeout, and then proceeded with 
closing the other resources even though the background thread may not have 
completed its cleanup.
   
   If you'd prefer, I can save this change for a separate PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-07 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1385617039


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.

Review Comment:
   Will change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-07 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1385615790


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * This extension interface provides a handful of methods to expose internals 
of the {@link Consumer} for
+ * various tests.
+ */
+public interface ConsumerDelegate extends Consumer {
+
+String getClientId();
+
+Metrics metricsInternal();

Review Comment:
   There's already an existing `metrics()` method in `Consumer`:
   
   ```
   Map metrics();
   ```
   
   I will take another look to see if the uses of `metricsInternal()` can be 
reworked to use `metrics()` and then the whole thing can be avoided.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-07 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1385614421


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * This extension interface provides a handful of methods to expose internals 
of the {@link Consumer} for
+ * various tests.

Review Comment:
   Will do.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * This extension interface provides a handful of methods to expose internals 
of the {@link Consumer} for
+ * various tests.
+ */
+public interface ConsumerDelegate extends Consumer {
+
+String getClientId();

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-07 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1385612995


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+import static org.apache.kafka.common.utils.Utils.isBlank;
+import static 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-07 Thread via GitHub


dajac commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1384566593


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * This extension interface provides a handful of methods to expose internals 
of the {@link Consumer} for
+ * various tests.

Review Comment:
   nit: Should we make it clear that this interface is only used internally?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -247,12 +247,10 @@ private void closeInternal(final Duration timeout) {
 closeTimeout = timeout;
 wakeup();
 
-if (timeoutMs > 0) {
-try {
-join(timeoutMs);
-} catch (InterruptedException e) {
-log.error("Interrupted while waiting for consumer network 
thread to complete", e);
-}
+try {

Review Comment:
   Why do we need this change?



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -835,16 +919,17 @@ public void testMissingOffsetNoResetPolicy() {
 assertThrows(NoOffsetForPartitionException.class, () -> 
consumer.poll(Duration.ZERO));
 }
 
-@Test
-public void testResetToCommittedOffset() {
+@ParameterizedTest
+@MethodSource("genericGroupProtocolOnly")

Review Comment:
   Same. Actually, it seems to me that all the tests relying on 
`consumer.assign` should work, no?



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -721,16 +802,17 @@ public void 
verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
 assertEquals(55L, consumer.position(tp0));
 }
 
-@Test
-public void verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit() 
{
+@ParameterizedTest
+@MethodSource("genericGroupProtocolOnly")

Review Comment:
   Why is this one only applied to the generic protocol? It seems that it 
should also work for the new one.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-06 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1384203237


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -66,18 +66,24 @@ public void testOverrideClientId() {
 
 @Test
 public void testOverrideEnableAutoCommit() {
-ConsumerConfig config = new ConsumerConfig(properties);
-boolean overrideEnableAutoCommit = 
config.maybeOverrideEnableAutoCommit();
-assertFalse(overrideEnableAutoCommit);
-
-properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true");
-config = new ConsumerConfig(properties);
-try {
-config.maybeOverrideEnableAutoCommit();
-fail("Should have thrown an exception");
-} catch (InvalidConfigurationException e) {
-// expected
-}
+// Verify that our default properties (no 'enable.auto.commit' or 
'group.id') are valid.
+assertDoesNotThrow(() -> new ConsumerConfig(properties));
+
+// Verify that explicitly disabling 'enable.auto.commit' still works.
+properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
Boolean.FALSE.toString());
+assertDoesNotThrow(() -> new ConsumerConfig(properties));
+
+// Verify that enabling 'enable.auto.commit' but without 'group.id' 
fails.
+properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
Boolean.TRUE.toString());
+assertThrows(InvalidConfigurationException.class, () -> new 
ConsumerConfig(properties));
+
+// Verify that then adding 'group.id' to the mix allows it to pass OK.
+properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
+assertDoesNotThrow(() -> new ConsumerConfig(properties));

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-06 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1384203100


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -66,18 +66,24 @@ public void testOverrideClientId() {
 
 @Test
 public void testOverrideEnableAutoCommit() {
-ConsumerConfig config = new ConsumerConfig(properties);
-boolean overrideEnableAutoCommit = 
config.maybeOverrideEnableAutoCommit();
-assertFalse(overrideEnableAutoCommit);
-
-properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true");
-config = new ConsumerConfig(properties);
-try {
-config.maybeOverrideEnableAutoCommit();
-fail("Should have thrown an exception");
-} catch (InvalidConfigurationException e) {
-// expected
-}
+// Verify that our default properties (no 'enable.auto.commit' or 
'group.id') are valid.
+assertDoesNotThrow(() -> new ConsumerConfig(properties));
+
+// Verify that explicitly disabling 'enable.auto.commit' still works.
+properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
Boolean.FALSE.toString());
+assertDoesNotThrow(() -> new ConsumerConfig(properties));

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-06 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1384200253


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumerTest.java:
##
@@ -164,7 +163,7 @@
  * Note to future authors in this class. If you close the consumer, close with 
DURATION.ZERO to reduce the duration of
  * the test.
  */
-public class KafkaConsumerTest {
+public class LegacyKafkaConsumerTest {

Review Comment:
   I have updated all of the tests to support parameterization. However, about 
half of them only support the legacy protocol because when we exercise the new 
protocol, the test fails. There is an explanation in the Javadoc of the method 
that is used as a 'source' for the parameters that elaborates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-03 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1381991350


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -278,22 +260,22 @@ public PrototypeAsyncConsumer(final Time time,
 }
 }
 
-public PrototypeAsyncConsumer(LogContext logContext,
-  String clientId,
-  Deserializers deserializers,
-  FetchBuffer fetchBuffer,
-  FetchCollector fetchCollector,
-  ConsumerInterceptors interceptors,
-  Time time,
-  ApplicationEventHandler 
applicationEventHandler,
-  BlockingQueue 
backgroundEventQueue,
-  Metrics metrics,
-  SubscriptionState subscriptions,
-  ConsumerMetadata metadata,
-  long retryBackoffMs,
-  int defaultApiTimeoutMs,
-  List assignors,
-  String groupId) {
+public AsyncKafkaConsumer(LogContext logContext,

Review Comment:
   No. Made package private.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@link 
ConsumerConfig#GROUP_PROTOCOL_CONFIG group.protocol}
+ * configuration. If the value is present and equal to {@code 
consumer}, the {@link AsyncKafkaConsumer}
+ * will be returned. Otherwise, the {@link LegacyKafkaConsumer} will be 
returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+public  Consumer create(ConsumerConfig config,
+Deserializer keyDeserializer,
+Deserializer valueDeserializer) {
+

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-03 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1381991173


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -695,17 +696,16 @@ else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) 
== null)
 return newConfigs;
 }
 
-boolean maybeOverrideEnableAutoCommit() {
+private void maybeOverrideEnableAutoCommit(Map configs) {
 Optional groupId = 
Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
-boolean enableAutoCommit = 
getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+boolean enableAutoCommit = 
values().containsKey(ENABLE_AUTO_COMMIT_CONFIG) ? 
getBoolean(ENABLE_AUTO_COMMIT_CONFIG) : false;

Review Comment:
   We can't call `getBoolean` because if the value isn't there, it throws an 
exception. We have to look at the `originals` to get the values that were 
passed in by the user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-03 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1381990015


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+import static org.apache.kafka.common.utils.Utils.isBlank;
+import static 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-03 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1381989627


##
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##
@@ -281,7 +281,7 @@ private int 
maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-03 Thread via GitHub


dajac commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1381644928


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -695,17 +696,16 @@ else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) 
== null)
 return newConfigs;
 }
 
-boolean maybeOverrideEnableAutoCommit() {
+private void maybeOverrideEnableAutoCommit(Map configs) {
 Optional groupId = 
Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
-boolean enableAutoCommit = 
getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+boolean enableAutoCommit = 
values().containsKey(ENABLE_AUTO_COMMIT_CONFIG) ? 
getBoolean(ENABLE_AUTO_COMMIT_CONFIG) : false;

Review Comment:
   For my understanding, why do we need to check 
`values().containsKey(ENABLE_AUTO_COMMIT_CONFIG)`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@link 
ConsumerConfig#GROUP_PROTOCOL_CONFIG group.protocol}
+ * configuration. If the value is present and equal to {@code 
consumer}, the {@link AsyncKafkaConsumer}
+ * will be returned. Otherwise, the {@link LegacyKafkaConsumer} will be 
returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+public  Consumer create(ConsumerConfig config,
+Deserializer keyDeserializer,
+Deserializer valueDeserializer) {
+

Review Comment:
   nit: We could remove this empty line.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -278,22 +260,22 @@ public PrototypeAsyncConsumer(final Time time,
 }
 }
 
-public PrototypeAsyncConsumer(LogContext logContext,
-  String clientId,
-  Deserializers deserializers,
-  FetchBuffer fetchBuffer,
-  FetchCollector fetchCollector,
-  ConsumerInterceptors interceptors,
-  Time time,
-  ApplicationEventHandler 
applicationEventHandler,
-  BlockingQueue 
backgroundEventQueue,
-  Metrics metrics,
-  SubscriptionState subscriptions,
-  ConsumerMetadata metadata,
-  long retryBackoffMs,
-  int defaultApiTimeoutMs,
-  List assignors,
-  String groupId) {
+public AsyncKafkaConsumer(LogContext logContext,

Review Comment:
   Do we need to keep this one 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380747077


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -67,17 +67,11 @@ public void testOverrideClientId() {
 @Test
 public void testOverrideEnableAutoCommit() {
 ConsumerConfig config = new ConsumerConfig(properties);
-boolean overrideEnableAutoCommit = 
config.maybeOverrideEnableAutoCommit();
-assertFalse(overrideEnableAutoCommit);
+//boolean overrideEnableAutoCommit = 
InternalConsumerConfig.maybeOverrideEnableAutoCommit(config);

Review Comment:
   Agreed. I revised this test method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380746726


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+import static org.apache.kafka.common.utils.Utils.isBlank;
+import static 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380719222


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig {
  */
 static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
"internal.leave.group.on.close";
 
-/**
- * internal.throw.on.fetch.stable.offset.unsupported
- * Whether or not the consumer should throw when the new stable offset 
feature is supported.
- * If set to true then the client shall crash upon hitting it.
- * The purpose of this flag is to prevent unexpected broker downgrade 
which makes
- * the offset fetch protection against pending commit invalid. The safest 
approach
- * is to fail fast to avoid introducing correctness issue.
- *
- * 
- * Note: this is an internal configuration and could be changed in the 
future in a backward incompatible way
- *
- */
-static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = 
"internal.throw.on.fetch.stable.offset.unsupported";

Review Comment:
   I have reverted this change. I have left 
`THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED` in `ConsumerConfig` but I've added a 
string constant in `ConsumerUtils` just so it can be referenced in the 
`internals` package.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380718362


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -982,23 +987,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer 
timer) {
 }
 }
 
-// This is here temporary as we don't have public access to the 
ConsumerConfig in this module.
-public static Map appendDeserializerToConfig(Map configs,
- 
Deserializer keyDeserializer,
- 
Deserializer valueDeserializer) {
-// validate deserializer configuration, if the passed deserializer 
instance is null, the user must explicitly set a valid deserializer 
configuration value
-Map newConfigs = new HashMap<>(configs);
-if (keyDeserializer != null)
-newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass());
-else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null)
-throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-if (valueDeserializer != null)
-newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer.getClass());
-else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null)
-throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-return newConfigs;
-}
-

Review Comment:
   This appears to be outdated. This is not in `ConsumerUtils` but stays in 
`ConsumerConfig`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380717397


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -666,19 +651,6 @@ else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) 
== null)
 return newConfigs;
 }
 
-boolean maybeOverrideEnableAutoCommit() {
-Optional groupId = 
Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
-boolean enableAutoCommit = 
getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
-if (!groupId.isPresent()) { // overwrite in case of default group id 
where the config is not explicitly provided
-if (!originals().containsKey(ENABLE_AUTO_COMMIT_CONFIG)) {
-enableAutoCommit = false;
-} else if (enableAutoCommit) {
-throw new 
InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " 
cannot be set to true when default group id (null) is used.");
-}
-}
-return enableAutoCommit;
-}
-

Review Comment:
   This is now outdated. It's back in `ConsumerConfig`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380717923


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -250,7 +255,7 @@ public PrototypeAsyncConsumer(final Time time,
 // no coordinator will be constructed for the default (null) group 
id
 if (!groupId.isPresent()) {
 config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
-
//config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+
config.ignore(ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);

Review Comment:
   I've moved the core variable back to `ConsumerConfig` but left a copy in 
`ConsumerUtils` with an explanation of why it's redefined there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


AndrewJSchofield commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380705419


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+import static org.apache.kafka.common.utils.Utils.isBlank;
+import static 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380667322


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {
+Object groupProtocol = configs.get("group.protocol");
+
+// Takes care of both the null and type checks.
+if (!(groupProtocol instanceof String))
+return false;
+
+return ((String) groupProtocol).equalsIgnoreCase("consumer");

Review Comment:
   Agreed. This code was written before the `ConsumerConfig` changes were in. 
I've updated it to use the `ConsumerConfig` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r138030


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {
+Object groupProtocol = configs.get("group.protocol");

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380667051


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {
+Object groupProtocol = configs.get("group.protocol");
+
+// Takes care of both the null and type checks.
+if (!(groupProtocol instanceof String))
+return false;

Review Comment:
   This code was written before the `ConsumerConfig` changes were in. I've 
updated it to use the `ConsumerConfig` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380664987


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumerTest.java:
##
@@ -2582,7 +2581,7 @@ private FetchResponse fetchResponse(TopicPartition 
partition, long fetchOffset,
 return fetchResponse(Collections.singletonMap(partition, fetchInfo));
 }
 
-private KafkaConsumer newConsumer(Time time,
+private LegacyKafkaConsumer newConsumer(Time time,
   KafkaClient client,

Review Comment:
   Thanks. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380663703


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumerTest.java:
##
@@ -164,7 +163,7 @@
  * Note to future authors in this class. If you close the consumer, close with 
DURATION.ZERO to reduce the duration of
  * the test.
  */
-public class KafkaConsumerTest {
+public class LegacyKafkaConsumerTest {

Review Comment:
   We have similar tests. My other attempt at the delegate work was much 
_deeper_ so that we could reuse this test and run it against both 
implementations, but it got out of hand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380662917


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2623 @@
+/*

Review Comment:
   Yes. The only changes are the removal of the extra constructors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380662469


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -108,13 +106,19 @@
 import static org.apache.kafka.common.utils.Utils.propsToMap;
 
 /**
- * This prototype consumer uses an {@link ApplicationEventHandler event 
handler} to process
- * {@link ApplicationEvent application events} so that the network IO can be 
processed in a dedicated
+ * This {@link Consumer} implementation uses an {@link ApplicationEventHandler 
event handler} to process
+ * {@link ApplicationEvent application events} so that the network I/O can be 
processed in a dedicated
  * {@link ConsumerNetworkThread network thread}. Visit
  * https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor;>this
 document
- * for detail implementation.
+ * for implementation detail.
+ *
+ * 
+ *
+ * Note: this {@link Consumer} implementation is part of the revised 
consumer group protocol from KIP-848.
+ * This class should not be invoked directly; users should instead create a 
{@link KafkaConsumer} as before.
+ * This consumer implements the new consumer group protocol and is intended to 
be the default in coming releases.
  */
-public class PrototypeAsyncConsumer implements Consumer {
+public class AsyncKafkaConsumer implements Consumer {

Review Comment:
   Done.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -148,30 +152,38 @@ public class PrototypeAsyncConsumer implements 
Consumer {
 private boolean cachedSubscriptionHasAllFetchPositions;
 private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
 
-public PrototypeAsyncConsumer(final Properties properties,
-  final Deserializer keyDeserializer,
-  final Deserializer valueDeserializer) {
+public AsyncKafkaConsumer(Map configs) {

Review Comment:
   Yep-done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380661924


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {
+Object groupProtocol = configs.get("group.protocol");
+
+// Takes care of both the null and type checks.
+if (!(groupProtocol instanceof String))
+return false;
+
+return ((String) groupProtocol).equalsIgnoreCase("consumer");
+}
+
+public  Consumer create(Map configs) {
+return createInternal(() -> {
+if (useNewConsumer(configs)) {
+return new AsyncKafkaConsumer<>(configs);
+} else {
+return new LegacyKafkaConsumer<>(configs);
+}
+});
+}
+
+public  Consumer create(Properties properties) {
+return createInternal(() -> {
+if (useNewConsumer(properties)) {
+return new AsyncKafkaConsumer<>(properties);
+} else {
+return new LegacyKafkaConsumer<>(properties);
+}
+});
+}
+
+public  Consumer create(Properties properties,
+Deserializer keyDeserializer,
+Deserializer valueDeserializer) {
+return createInternal(() -> {
+if (useNewConsumer(properties)) {
+return new AsyncKafkaConsumer<>(properties, keyDeserializer, 
valueDeserializer);
+} else {
+return new LegacyKafkaConsumer<>(properties, keyDeserializer, 
valueDeserializer);
+}
+});
+}
+
+public  Consumer create(Map configs,
+Deserializer keyDeserializer,
+Deserializer valueDeserializer) {
+return createInternal(() -> {
+if (useNewConsumer(configs)) {
+return new AsyncKafkaConsumer<>(configs, keyDeserializer, 
valueDeserializer);
+} else {
+return new LegacyKafkaConsumer<>(configs, keyDeserializer, 
valueDeserializer);
+}
+});
+}
+
+public  Consumer create(ConsumerConfig config,
+Deserializer keyDeserializer,
+Deserializer 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380661746


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {

Review Comment:
   I've refactored the code to use fewer constructors in the delegates and pass 
a fully-constructed `ConsumerConfig` to the delegates.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380611681


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {
+Object groupProtocol = configs.get("group.protocol");
+
+// Takes care of both the null and type checks.
+if (!(groupProtocol instanceof String))
+return false;
+
+return ((String) groupProtocol).equalsIgnoreCase("consumer");
+}
+
+public  Consumer create(Map configs) {
+return createInternal(() -> {
+if (useNewConsumer(configs)) {
+return new AsyncKafkaConsumer<>(configs);
+} else {
+return new LegacyKafkaConsumer<>(configs);
+}
+});
+}
+
+public  Consumer create(Properties properties) {
+return createInternal(() -> {
+if (useNewConsumer(properties)) {
+return new AsyncKafkaConsumer<>(properties);
+} else {
+return new LegacyKafkaConsumer<>(properties);
+}
+});
+}
+
+public  Consumer create(Properties properties,
+Deserializer keyDeserializer,
+Deserializer valueDeserializer) {
+return createInternal(() -> {
+if (useNewConsumer(properties)) {
+return new AsyncKafkaConsumer<>(properties, keyDeserializer, 
valueDeserializer);
+} else {
+return new LegacyKafkaConsumer<>(properties, keyDeserializer, 
valueDeserializer);
+}
+});
+}
+
+public  Consumer create(Map configs,
+Deserializer keyDeserializer,
+Deserializer valueDeserializer) {
+return createInternal(() -> {
+if (useNewConsumer(configs)) {
+return new AsyncKafkaConsumer<>(configs, keyDeserializer, 
valueDeserializer);
+} else {
+return new LegacyKafkaConsumer<>(configs, keyDeserializer, 
valueDeserializer);
+}
+});
+}
+
+public  Consumer create(ConsumerConfig config,
+Deserializer keyDeserializer,
+Deserializer 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380611443


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {

Review Comment:
   We could, yes. I will try to refactor to do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380610674


##
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##
@@ -281,7 +281,7 @@ private int 
maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-02 Thread via GitHub


dajac commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1380062879


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -108,13 +106,19 @@
 import static org.apache.kafka.common.utils.Utils.propsToMap;
 
 /**
- * This prototype consumer uses an {@link ApplicationEventHandler event 
handler} to process
- * {@link ApplicationEvent application events} so that the network IO can be 
processed in a dedicated
+ * This {@link Consumer} implementation uses an {@link ApplicationEventHandler 
event handler} to process
+ * {@link ApplicationEvent application events} so that the network I/O can be 
processed in a dedicated
  * {@link ConsumerNetworkThread network thread}. Visit
  * https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor;>this
 document
- * for detail implementation.
+ * for implementation detail.
+ *
+ * 
+ *
+ * Note: this {@link Consumer} implementation is part of the revised 
consumer group protocol from KIP-848.
+ * This class should not be invoked directly; users should instead create a 
{@link KafkaConsumer} as before.
+ * This consumer implements the new consumer group protocol and is intended to 
be the default in coming releases.
  */
-public class PrototypeAsyncConsumer implements Consumer {
+public class AsyncKafkaConsumer implements Consumer {

Review Comment:
   I wonder if we should make it package private. Thoughts?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -148,30 +152,38 @@ public class PrototypeAsyncConsumer implements 
Consumer {
 private boolean cachedSubscriptionHasAllFetchPositions;
 private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
 
-public PrototypeAsyncConsumer(final Properties properties,
-  final Deserializer keyDeserializer,
-  final Deserializer valueDeserializer) {
+public AsyncKafkaConsumer(Map configs) {

Review Comment:
   I really wonder if we need all those constructors now that we have the 
delegate creator. For instance, would it be possible to only keep the following 
ones and keep all the conversion to `ConsumerConfig` in the delegate creator?
   ```
   public AsyncKafkaConsumer(final ConsumerConfig config,
 final Deserializer keyDeserializer,
 final Deserializer valueDeserializer) {
   this(Time.SYSTEM, config, keyDeserializer, valueDeserializer);
   }
   
   public AsyncKafkaConsumer(final Time time,
 final ConsumerConfig config,
 final Deserializer keyDeserializer,
 final Deserializer valueDeserializer) {
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-01 Thread via GitHub


philipnee commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1379033359


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -982,23 +987,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer 
timer) {
 }
 }
 
-// This is here temporary as we don't have public access to the 
ConsumerConfig in this module.
-public static Map appendDeserializerToConfig(Map configs,
- 
Deserializer keyDeserializer,
- 
Deserializer valueDeserializer) {
-// validate deserializer configuration, if the passed deserializer 
instance is null, the user must explicitly set a valid deserializer 
configuration value
-Map newConfigs = new HashMap<>(configs);
-if (keyDeserializer != null)
-newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass());
-else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null)
-throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-if (valueDeserializer != null)
-newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer.getClass());
-else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null)
-throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-return newConfigs;
-}
-

Review Comment:
   Great find!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-01 Thread via GitHub


philipnee commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1379033013


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -278,22 +283,22 @@ public PrototypeAsyncConsumer(final Time time,
 }
 }
 
-public PrototypeAsyncConsumer(LogContext logContext,
-  String clientId,
-  Deserializers deserializers,
-  FetchBuffer fetchBuffer,
-  FetchCollector fetchCollector,
-  ConsumerInterceptors interceptors,
-  Time time,
-  ApplicationEventHandler 
applicationEventHandler,
-  BlockingQueue 
backgroundEventQueue,
-  Metrics metrics,
-  SubscriptionState subscriptions,
-  ConsumerMetadata metadata,
-  long retryBackoffMs,
-  int defaultApiTimeoutMs,
-  List assignors,
-  String groupId) {
+public AsyncKafkaConsumer(LogContext logContext,

Review Comment:
   I don't remember if this constructor is used for testing, if it is can we 
change the scope to package-private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1378285408


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -108,13 +105,13 @@
 import static org.apache.kafka.common.utils.Utils.propsToMap;
 
 /**
- * This prototype consumer uses an {@link ApplicationEventHandler event 
handler} to process
- * {@link ApplicationEvent application events} so that the network IO can be 
processed in a dedicated
+ * This {@link Consumer} implementation uses an {@link ApplicationEventHandler 
event handler} to process

Review Comment:
   Done. If you want the verbiage changed in some fashion, just let me know. 
Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1378285238


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+import static org.apache.kafka.common.utils.Utils.isBlank;
+import static org.apache.kafka.common.utils.Utils.join;
+import static org.apache.kafka.common.utils.Utils.propsToMap;
+import static 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1378277149


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -670,168 +582,11 @@ public KafkaConsumer(Properties properties,
 public KafkaConsumer(Map configs,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer) {
-this(new 
ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, 
keyDeserializer, valueDeserializer)),
-keyDeserializer, valueDeserializer);
+delegate = CREATOR.create(configs, keyDeserializer, valueDeserializer);
 }
 
-@SuppressWarnings("unchecked")
 KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
Deserializer valueDeserializer) {
-try {
-GroupRebalanceConfig groupRebalanceConfig = new 
GroupRebalanceConfig(config,
-GroupRebalanceConfig.ProtocolType.CONSUMER);
-
-this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
-this.clientId = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
-LogContext logContext = createLogContext(config, 
groupRebalanceConfig);
-this.log = logContext.logger(getClass());
-boolean enableAutoCommit = config.maybeOverrideEnableAutoCommit();
-groupId.ifPresent(groupIdStr -> {
-if (groupIdStr.isEmpty()) {
-log.warn("Support for using the empty group id by 
consumers is deprecated and will be removed in the next major release.");
-}
-});
-
-log.debug("Initializing the Kafka consumer");
-this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
-this.time = Time.SYSTEM;
-this.metrics = createMetrics(config, time);
-this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
-this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
-
-List> interceptorList = 
configuredConsumerInterceptors(config);
-this.interceptors = new ConsumerInterceptors<>(interceptorList);
-this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer);
-this.subscriptions = createSubscriptionState(config, logContext);
-ClusterResourceListeners clusterResourceListeners = 
ClientUtils.configureClusterResourceListeners(
-metrics.reporters(),
-interceptorList,
-Arrays.asList(this.deserializers.keyDeserializer, 
this.deserializers.valueDeserializer));
-this.metadata = new ConsumerMetadata(config, subscriptions, 
logContext, clusterResourceListeners);
-List addresses = 
ClientUtils.parseAndValidateAddresses(config);
-this.metadata.bootstrap(addresses);
-
-FetchMetricsManager fetchMetricsManager = 
createFetchMetricsManager(metrics);
-FetchConfig fetchConfig = new FetchConfig(config);
-this.isolationLevel = fetchConfig.isolationLevel;
-
-ApiVersions apiVersions = new ApiVersions();
-this.client = createConsumerNetworkClient(config,
-metrics,
-logContext,
-apiVersions,
-time,
-metadata,
-fetchMetricsManager.throttleTimeSensor(),
-retryBackoffMs);
-
-this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
-
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
-
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId))
-);
-
-// no coordinator will be constructed for the default (null) group 
id
-if (!groupId.isPresent()) {
-config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
-
config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
-this.coordinator = null;
-} else {
-this.coordinator = new 
ConsumerCoordinator(groupRebalanceConfig,
-logContext,
-this.client,
-assignors,
-this.metadata,
-this.subscriptions,
-metrics,
-CONSUMER_METRIC_GROUP_PREFIX,
-this.time,
-enableAutoCommit,
-
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
-this.interceptors,
-

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1378276620


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -108,13 +105,13 @@
 import static org.apache.kafka.common.utils.Utils.propsToMap;
 
 /**
- * This prototype consumer uses an {@link ApplicationEventHandler event 
handler} to process
- * {@link ApplicationEvent application events} so that the network IO can be 
processed in a dedicated
+ * This {@link Consumer} implementation uses an {@link ApplicationEventHandler 
event handler} to process

Review Comment:
   Will do.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
+import static 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1378276022


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##
@@ -211,4 +226,17 @@ else if (t instanceof KafkaException)
 throw new TimeoutException(e);
 }
 }
+
+public static boolean maybeOverrideEnableAutoCommit(ConsumerConfig config) 
{
+Optional groupId = 
Optional.ofNullable(config.getString(CommonClientConfigs.GROUP_ID_CONFIG));
+boolean enableAutoCommit = 
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+if (!groupId.isPresent()) { // overwrite in case of default group id 
where the config is not explicitly provided

Review Comment:
   I didn't add it in this PR, no. The Jira refers to doing that in the "next 
major release," so that would imply we should hold off and handle it outside 
this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1378275567


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {
+Object groupProtocol = configs.get("group.protocol");
+
+// Takes care of both the null and type checks.
+if (!(groupProtocol instanceof String))
+return false;
+
+return ((String) groupProtocol).equalsIgnoreCase("consumer");
+}
+
+public  Consumer create(Map configs) {
+return createInternal(() -> {
+if (useNewConsumer(configs)) {
+return new AsyncKafkaConsumer<>(configs);
+} else {
+return new LegacyKafkaConsumer<>(configs);
+}
+});
+}
+
+public  Consumer create(Properties properties) {
+return createInternal(() -> {
+if (useNewConsumer(properties)) {
+return new AsyncKafkaConsumer<>(properties);
+} else {
+return new LegacyKafkaConsumer<>(properties);
+}
+});
+}
+
+public  Consumer create(Properties properties,
+Deserializer keyDeserializer,
+Deserializer valueDeserializer) {
+return createInternal(() -> {
+if (useNewConsumer(properties)) {
+return new AsyncKafkaConsumer<>(properties, keyDeserializer, 
valueDeserializer);
+} else {
+return new LegacyKafkaConsumer<>(properties, keyDeserializer, 
valueDeserializer);
+}
+});
+}
+
+public  Consumer create(Map configs,
+Deserializer keyDeserializer,
+Deserializer valueDeserializer) {
+return createInternal(() -> {
+if (useNewConsumer(configs)) {
+return new AsyncKafkaConsumer<>(configs, keyDeserializer, 
valueDeserializer);
+} else {
+return new LegacyKafkaConsumer<>(configs, keyDeserializer, 
valueDeserializer);
+}
+});
+}
+
+public  Consumer create(ConsumerConfig config,
+Deserializer keyDeserializer,
+Deserializer 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1378275212


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {
+Object groupProtocol = configs.get("group.protocol");
+
+// Takes care of both the null and type checks.
+if (!(groupProtocol instanceof String))
+return false;
+
+return ((String) groupProtocol).equalsIgnoreCase("consumer");
+}
+
+public  Consumer create(Map configs) {
+return createInternal(() -> {
+if (useNewConsumer(configs)) {
+return new AsyncKafkaConsumer<>(configs);
+} else {
+return new LegacyKafkaConsumer<>(configs);
+}
+});
+}
+
+public  Consumer create(Properties properties) {

Review Comment:
   I wanted a specific constructor in `KafkaConsumer` to invoke the matching 
specific constructor of the delegate class. That way there's no weird 
manipulation/trickery in case the delegate constructors did things subtly 
different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1378274154


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {

Review Comment:
   `useNewConsumer` was written to take a generic `Map` so that it can be 
called from all the constructors, including the ones that don't use 
`ConsumerConfig`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


dajac commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1788029739

   @kirktrue Noted. I will be away tomorrow but I can’t take a look at it when 
I come back if nobody gets to it first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


philipnee commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1377954063


##
tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java:
##
@@ -430,8 +430,9 @@ public void testConsume(final long prodTimeMs) throws 
Throwable {
 () -> log.info("offsetsForTime = {}", offsetsForTime.result));
 // Whether or not offsetsForTimes works, beginningOffsets and 
endOffsets
 // should work.
-consumer.beginningOffsets(timestampsToSearch.keySet());
-consumer.endOffsets(timestampsToSearch.keySet());
+Map beginningOffsets = 
consumer.beginningOffsets(timestampsToSearch.keySet());
+Map endingOffsets = 
consumer.endOffsets(timestampsToSearch.keySet());
+log.trace("beginningOffsets: {}, endingOffsets: {}", 
beginningOffsets, endingOffsets);

Review Comment:
   I find spotBugs flaky.  Sometimes it would complain about a bunch of stuff I 
never touched...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


philipnee commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1377942074


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##
@@ -211,4 +226,17 @@ else if (t instanceof KafkaException)
 throw new TimeoutException(e);
 }
 }
+
+public static boolean maybeOverrideEnableAutoCommit(ConsumerConfig config) 
{
+Optional groupId = 
Optional.ofNullable(config.getString(CommonClientConfigs.GROUP_ID_CONFIG));
+boolean enableAutoCommit = 
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+if (!groupId.isPresent()) { // overwrite in case of default group id 
where the config is not explicitly provided

Review Comment:
   Did we add a check to invalidate empty groupId?  
https://issues.apache.org/jira/browse/KAFKA-14438



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


philipnee commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1377942677


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##
@@ -211,4 +226,17 @@ else if (t instanceof KafkaException)
 throw new TimeoutException(e);
 }
 }
+
+public static boolean maybeOverrideEnableAutoCommit(ConsumerConfig config) 
{
+Optional groupId = 
Optional.ofNullable(config.getString(CommonClientConfigs.GROUP_ID_CONFIG));
+boolean enableAutoCommit = 
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+if (!groupId.isPresent()) { // overwrite in case of default group id 
where the config is not explicitly provided

Review Comment:
   I was also wondering if this should should happen in the ConsumerConfig



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


philipnee commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1377938898


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -108,13 +105,13 @@
 import static org.apache.kafka.common.utils.Utils.propsToMap;
 
 /**
- * This prototype consumer uses an {@link ApplicationEventHandler event 
handler} to process
- * {@link ApplicationEvent application events} so that the network IO can be 
processed in a dedicated
+ * This {@link Consumer} implementation uses an {@link ApplicationEventHandler 
event handler} to process

Review Comment:
   Similarly - Can we add a description stating the module is used to support 
KIP-848 protocol? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


philipnee commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1377937716


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+import static org.apache.kafka.common.utils.Utils.isBlank;
+import static org.apache.kafka.common.utils.Utils.join;
+import static org.apache.kafka.common.utils.Utils.propsToMap;
+import static 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


philipnee commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1377933810


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {
+Object groupProtocol = configs.get("group.protocol");
+
+// Takes care of both the null and type checks.
+if (!(groupProtocol instanceof String))
+return false;
+
+return ((String) groupProtocol).equalsIgnoreCase("consumer");
+}
+
+public  Consumer create(Map configs) {
+return createInternal(() -> {
+if (useNewConsumer(configs)) {
+return new AsyncKafkaConsumer<>(configs);
+} else {
+return new LegacyKafkaConsumer<>(configs);
+}
+});
+}
+
+public  Consumer create(Properties properties) {
+return createInternal(() -> {
+if (useNewConsumer(properties)) {
+return new AsyncKafkaConsumer<>(properties);
+} else {
+return new LegacyKafkaConsumer<>(properties);
+}
+});
+}
+
+public  Consumer create(Properties properties,
+Deserializer keyDeserializer,
+Deserializer valueDeserializer) {
+return createInternal(() -> {
+if (useNewConsumer(properties)) {
+return new AsyncKafkaConsumer<>(properties, keyDeserializer, 
valueDeserializer);
+} else {
+return new LegacyKafkaConsumer<>(properties, keyDeserializer, 
valueDeserializer);
+}
+});
+}
+
+public  Consumer create(Map configs,
+Deserializer keyDeserializer,
+Deserializer valueDeserializer) {
+return createInternal(() -> {
+if (useNewConsumer(configs)) {
+return new AsyncKafkaConsumer<>(configs, keyDeserializer, 
valueDeserializer);
+} else {
+return new LegacyKafkaConsumer<>(configs, keyDeserializer, 
valueDeserializer);
+}
+});
+}
+
+public  Consumer create(ConsumerConfig config,
+Deserializer keyDeserializer,
+Deserializer 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


philipnee commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1377930480


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {
+Object groupProtocol = configs.get("group.protocol");
+
+// Takes care of both the null and type checks.
+if (!(groupProtocol instanceof String))
+return false;
+
+return ((String) groupProtocol).equalsIgnoreCase("consumer");
+}
+
+public  Consumer create(Map configs) {
+return createInternal(() -> {
+if (useNewConsumer(configs)) {
+return new AsyncKafkaConsumer<>(configs);
+} else {
+return new LegacyKafkaConsumer<>(configs);
+}
+});
+}
+
+public  Consumer create(Properties properties) {

Review Comment:
   could use `propsToMap(properties)` to invoke `public  Consumer 
create(Map configs)`. 
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


philipnee commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1377928108


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides 
the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different 
implementations to co-exist under
+ * the covers.
+ *
+ * 
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming 
configuration and determines if
+ * it is using the new KIP-848 consumer protocol or if it should fall back to 
the existing, legacy group protocol.
+ * This is based on the presence and value of the {@code group.protocol} 
configuration value. If the value is present
+ * and equals {@code consumer}, the {@link AsyncKafkaConsumer} will be 
returned. Otherwise, the
+ * {@link LegacyKafkaConsumer} will be returned.
+ *
+ * 
+ *
+ * This is not to be called by end users and callers should not attempt to 
determine the underlying implementation
+ * as this will make such code very brittle. Users of this facility should 
honor the top-level {@link Consumer} API
+ * contract as-is.
+ */
+public class ConsumerDelegateCreator {
+
+/**
+ * This is it! This is the core logic. It's extremely rudimentary.
+ */
+private static boolean useNewConsumer(Map configs) {

Review Comment:
   The configuration is validated in the ConsumerConfig.java constructor; I 
wonder if we could directly invoke: `KafkaConsumer(ConsumerConfig config, 
Deserializer keyDeserializer, Deserializer valueDeserializer) ` to avoid 
double validation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


philipnee commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1377919975


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -670,168 +582,11 @@ public KafkaConsumer(Properties properties,
 public KafkaConsumer(Map configs,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer) {
-this(new 
ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, 
keyDeserializer, valueDeserializer)),
-keyDeserializer, valueDeserializer);
+delegate = CREATOR.create(configs, keyDeserializer, valueDeserializer);
 }
 
-@SuppressWarnings("unchecked")
 KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
Deserializer valueDeserializer) {
-try {
-GroupRebalanceConfig groupRebalanceConfig = new 
GroupRebalanceConfig(config,
-GroupRebalanceConfig.ProtocolType.CONSUMER);
-
-this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
-this.clientId = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
-LogContext logContext = createLogContext(config, 
groupRebalanceConfig);
-this.log = logContext.logger(getClass());
-boolean enableAutoCommit = config.maybeOverrideEnableAutoCommit();
-groupId.ifPresent(groupIdStr -> {
-if (groupIdStr.isEmpty()) {
-log.warn("Support for using the empty group id by 
consumers is deprecated and will be removed in the next major release.");
-}
-});
-
-log.debug("Initializing the Kafka consumer");
-this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
-this.time = Time.SYSTEM;
-this.metrics = createMetrics(config, time);
-this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
-this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
-
-List> interceptorList = 
configuredConsumerInterceptors(config);
-this.interceptors = new ConsumerInterceptors<>(interceptorList);
-this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer);
-this.subscriptions = createSubscriptionState(config, logContext);
-ClusterResourceListeners clusterResourceListeners = 
ClientUtils.configureClusterResourceListeners(
-metrics.reporters(),
-interceptorList,
-Arrays.asList(this.deserializers.keyDeserializer, 
this.deserializers.valueDeserializer));
-this.metadata = new ConsumerMetadata(config, subscriptions, 
logContext, clusterResourceListeners);
-List addresses = 
ClientUtils.parseAndValidateAddresses(config);
-this.metadata.bootstrap(addresses);
-
-FetchMetricsManager fetchMetricsManager = 
createFetchMetricsManager(metrics);
-FetchConfig fetchConfig = new FetchConfig(config);
-this.isolationLevel = fetchConfig.isolationLevel;
-
-ApiVersions apiVersions = new ApiVersions();
-this.client = createConsumerNetworkClient(config,
-metrics,
-logContext,
-apiVersions,
-time,
-metadata,
-fetchMetricsManager.throttleTimeSensor(),
-retryBackoffMs);
-
-this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
-
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
-
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId))
-);
-
-// no coordinator will be constructed for the default (null) group 
id
-if (!groupId.isPresent()) {
-config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
-
config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
-this.coordinator = null;
-} else {
-this.coordinator = new 
ConsumerCoordinator(groupRebalanceConfig,
-logContext,
-this.client,
-assignors,
-this.metadata,
-this.subscriptions,
-metrics,
-CONSUMER_METRIC_GROUP_PREFIX,
-this.time,
-enableAutoCommit,
-
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
-this.interceptors,
-

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


philipnee commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1377917485


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -670,168 +582,11 @@ public KafkaConsumer(Properties properties,
 public KafkaConsumer(Map configs,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer) {
-this(new 
ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, 
keyDeserializer, valueDeserializer)),
-keyDeserializer, valueDeserializer);
+delegate = CREATOR.create(configs, keyDeserializer, valueDeserializer);
 }
 
-@SuppressWarnings("unchecked")
 KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, 
Deserializer valueDeserializer) {
-try {
-GroupRebalanceConfig groupRebalanceConfig = new 
GroupRebalanceConfig(config,
-GroupRebalanceConfig.ProtocolType.CONSUMER);
-
-this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
-this.clientId = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
-LogContext logContext = createLogContext(config, 
groupRebalanceConfig);
-this.log = logContext.logger(getClass());
-boolean enableAutoCommit = config.maybeOverrideEnableAutoCommit();
-groupId.ifPresent(groupIdStr -> {
-if (groupIdStr.isEmpty()) {
-log.warn("Support for using the empty group id by 
consumers is deprecated and will be removed in the next major release.");
-}
-});
-
-log.debug("Initializing the Kafka consumer");
-this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
-this.time = Time.SYSTEM;
-this.metrics = createMetrics(config, time);
-this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
-this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
-
-List> interceptorList = 
configuredConsumerInterceptors(config);
-this.interceptors = new ConsumerInterceptors<>(interceptorList);
-this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer);
-this.subscriptions = createSubscriptionState(config, logContext);
-ClusterResourceListeners clusterResourceListeners = 
ClientUtils.configureClusterResourceListeners(
-metrics.reporters(),
-interceptorList,
-Arrays.asList(this.deserializers.keyDeserializer, 
this.deserializers.valueDeserializer));
-this.metadata = new ConsumerMetadata(config, subscriptions, 
logContext, clusterResourceListeners);
-List addresses = 
ClientUtils.parseAndValidateAddresses(config);
-this.metadata.bootstrap(addresses);
-
-FetchMetricsManager fetchMetricsManager = 
createFetchMetricsManager(metrics);
-FetchConfig fetchConfig = new FetchConfig(config);
-this.isolationLevel = fetchConfig.isolationLevel;
-
-ApiVersions apiVersions = new ApiVersions();
-this.client = createConsumerNetworkClient(config,
-metrics,
-logContext,
-apiVersions,
-time,
-metadata,
-fetchMetricsManager.throttleTimeSensor(),
-retryBackoffMs);
-
-this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
-
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
-
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId))
-);
-
-// no coordinator will be constructed for the default (null) group 
id
-if (!groupId.isPresent()) {
-config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
-
config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
-this.coordinator = null;
-} else {
-this.coordinator = new 
ConsumerCoordinator(groupRebalanceConfig,
-logContext,
-this.client,
-assignors,
-this.metadata,
-this.subscriptions,
-metrics,
-CONSUMER_METRIC_GROUP_PREFIX,
-this.time,
-enableAutoCommit,
-
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
-this.interceptors,
-

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


kirktrue commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1787588430

   @dajac This is ready for your review. The test failures are unrelated. 
Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-31 Thread via GitHub


dajac commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1377343382


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig {
  */
 static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
"internal.leave.group.on.close";
 
-/**
- * internal.throw.on.fetch.stable.offset.unsupported
- * Whether or not the consumer should throw when the new stable offset 
feature is supported.
- * If set to true then the client shall crash upon hitting it.
- * The purpose of this flag is to prevent unexpected broker downgrade 
which makes
- * the offset fetch protection against pending commit invalid. The safest 
approach
- * is to fail fast to avoid introducing correctness issue.
- *
- * 
- * Note: this is an internal configuration and could be changed in the 
future in a backward incompatible way
- *
- */
-static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = 
"internal.throw.on.fetch.stable.offset.unsupported";
-

Review Comment:
   If it is not public, it is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-30 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1376925327


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig {
  */
 static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
"internal.leave.group.on.close";
 
-/**
- * internal.throw.on.fetch.stable.offset.unsupported
- * Whether or not the consumer should throw when the new stable offset 
feature is supported.
- * If set to true then the client shall crash upon hitting it.
- * The purpose of this flag is to prevent unexpected broker downgrade 
which makes
- * the offset fetch protection against pending commit invalid. The safest 
approach
- * is to fail fast to avoid introducing correctness issue.
- *
- * 
- * Note: this is an internal configuration and could be changed in the 
future in a backward incompatible way
- *
- */
-static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = 
"internal.throw.on.fetch.stable.offset.unsupported";
-

Review Comment:
   I moved this to `ConsumerUtils` because `LegacyKafkaConsumer` and 
`AsyncKafkaConsumer` couldn't access it via package-level visibility since 
they're in the `internals` sub-package.
   
   @dajac—will this be considered a breaking change? I assumed not because 
`THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED` isn't public.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -250,7 +255,7 @@ public PrototypeAsyncConsumer(final Time time,
 // no coordinator will be constructed for the default (null) group 
id
 if (!groupId.isPresent()) {
 config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
-
//config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+
config.ignore(ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);

Review Comment:
   Moving the package-level variable and method from `ConsumerConfig` also 
fixes this issue.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -982,23 +987,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer 
timer) {
 }
 }
 
-// This is here temporary as we don't have public access to the 
ConsumerConfig in this module.
-public static Map appendDeserializerToConfig(Map configs,
- 
Deserializer keyDeserializer,
- 
Deserializer valueDeserializer) {
-// validate deserializer configuration, if the passed deserializer 
instance is null, the user must explicitly set a valid deserializer 
configuration value
-Map newConfigs = new HashMap<>(configs);
-if (keyDeserializer != null)
-newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass());
-else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null)
-throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-if (valueDeserializer != null)
-newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer.getClass());
-else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null)
-throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-return newConfigs;
-}
-

Review Comment:
   The original version of this method as it appeared in `ConsumerConfig` was 
moved to `ConsumerUtils` so now it can be used from here too  



##
tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java:
##
@@ -430,8 +430,9 @@ public void testConsume(final long prodTimeMs) throws 
Throwable {
 () -> log.info("offsetsForTime = {}", offsetsForTime.result));
 // Whether or not offsetsForTimes works, beginningOffsets and 
endOffsets
 // should work.
-consumer.beginningOffsets(timestampsToSearch.keySet());
-consumer.endOffsets(timestampsToSearch.keySet());
+Map beginningOffsets = 
consumer.beginningOffsets(timestampsToSearch.keySet());
+Map endingOffsets = 
consumer.endOffsets(timestampsToSearch.keySet());
+log.trace("beginningOffsets: {}, endingOffsets: {}", 
beginningOffsets, endingOffsets);

Review Comment:
   This is super annoying.
   
   I started getting errors from SpotBugs because the offsets methods were 
called but the return value was being ignored. This is a very brute force way 
of silencing the checker. I could not find a clean way to ignore the warning.
   
   I also don't know why this is suddenly being caught by SpotBugs. From its 
perspective, nothing has changed in the `KafkaConsumer` API, right?



##

  1   2   >