[jira] [Resolved] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
[ https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16160. Resolution: Cannot Reproduce > AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop > -- > > Key: KAFKA-16160 > URL: https://issues.apache.org/jira/browse/KAFKA-16160 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Observing some excessive logging running AsyncKafkaConsumer and observing > excessive logging of : > {code:java} > 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, > groupId=concurrent_consumer] Node is not ready, handle the request in the > next event loop: node=worker4:9092 (id: 2147483644 rack: null), > request=UnsentRequest{requestBuil > der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', > memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, > rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], > serverAssignor=null, topicP > artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, > node=Optional[worker4:9092 (id: 2147483644 rack: null)] , > timer=org.apache.kafka.common.utils.Timer@55ed4733} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} > This seems to be triggered by a tight poll loop of the network thread. The > right thing to do is to backoff a bit for that given node and retry later. > This should be a blocker for 3.8 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16819) CoordinatorRequestManager seems to return 0ms during the coordinator discovery
Philip Nee created KAFKA-16819: -- Summary: CoordinatorRequestManager seems to return 0ms during the coordinator discovery Key: KAFKA-16819 URL: https://issues.apache.org/jira/browse/KAFKA-16819 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee In KAFKA-15250 we discovered the ConsumerNetworkThread is looping without much backoff. The in-flight check PR fixed a lot of it; however, during the coordinator discovery phase, CoordinatorRequestManager would keep on returning 0 before the coordinator node was found. The impact is minor but we should be expecting the coordinator manager to backoff until the exp backoff expired (so it should return around 100ms). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16799) NetworkClientDelegate is not backing off if the node is not found
Philip Nee created KAFKA-16799: -- Summary: NetworkClientDelegate is not backing off if the node is not found Key: KAFKA-16799 URL: https://issues.apache.org/jira/browse/KAFKA-16799 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee When performing stress testing, I found that AsycnKafkaConsumer's network client delegate isn't backing off if the node is not ready, causing a large number of: {code:java} 358 [2024-05-20 22:59:02,591] DEBUG [Consumer clientId=consumer.7136899e-0c20-4ccb-8ba3-497e9e683594-0, groupId=consumer-groups-test-5] Node is not ready, handle the request in the next event loop: node=b4-pkc-devcmkz697.us-west-2.aws.devel.cpd ev.cloud:9092 (id: 2147483643 rack: null), request=UnsentRequest{requestBuilder=ConsumerGroupHeartbeatRequestData(groupId='consumer-groups-test-5', memberId='', memberEpoch=0, instanceId=null, rackId=null, rebalanceTimeoutMs=10, subscri bedTopicNames=[_kengine-565-test-topic8081], serverAssignor=null, topicPartitions=[]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@139a8761, node=Optional[b4-pkc-devcmkz697.us-west-2.aws .devel.cpdev.cloud:9092 (id: 2147483643 rack: null)], timer=org.apache.kafka.common.utils.Timer@649fffad} (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate:169) {code} show up in the log. What should have happened is: 1. node is not ready 2. exponential back off 3. retry -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16778) AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked partition
Philip Nee created KAFKA-16778: -- Summary: AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked partition Key: KAFKA-16778 URL: https://issues.apache.org/jira/browse/KAFKA-16778 Project: Kafka Issue Type: Bug Reporter: Philip Nee {code:java} java.lang.IllegalStateException: No current assignment for partition output-topic-26 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369) at org.apache.kafka.clients.consumer.internals.SubscriptionState.position(SubscriptionState.java:542) at org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:411) at org.apache.kafka.clients.consumer.internals.FetchRequestManager.poll(FetchRequestManager.java:74) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$new$2(ConsumerNetworkThread.java:159) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:143) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:145) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:94) {code} The setup is - running 30 consumers consuming from a 300 partitions topic. We can occasionally get an IllegalStateException from the consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16474. Resolution: Fixed Ran the test several times, the client log also looks fine. > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: failing_results.zip > > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16579. Resolution: Fixed > Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer > - > > Key: KAFKA-16579 > URL: https://issues.apache.org/jira/browse/KAFKA-16579 > Project: Kafka > Issue Type: Task > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated > a slew of system tests to run both the "old" and "new" implementations. > KAFKA-16271 updated the system tests in {{consumer_rolling_upgrade_test.py}} > so it could test the new consumer. However, the test is tailored specifically > to the "old" Consumer's protocol and assignment strategy upgrade. > Unsurprisingly, when we run those system tests with the new > {{AsyncKafkaConsumer}}, we get errors like the following: > {code} > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 29.634 seconds > AssertionError("Mismatched assignment: {frozenset(), > frozenset({TopicPartition(topic='test_topic', partition=0), > TopicPartition(topic='test_topic', partition=1)})}") > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", > line 77, in rolling_update_test > self._verify_range_assignment(consumer) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", > line 41, in _verify_range_assignment > "Mismatched assignment: %s" % assignment > AssertionError: Mismatched assignment: {frozenset(), > frozenset({TopicPartition(topic='test_topic', partition=0), > TopicPartition(topic='test_topic', partition=1)})} > {code} > The task here is to revert the changes made in KAFKA-16271. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16389. Resolution: Fixed > consumer_test.py’s test_valid_assignment fails with new consumer > > > Key: KAFKA-16389 > URL: https://issues.apache.org/jira/browse/KAFKA-16389 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > Attachments: KAFKA-16389.patch, consumer.log > > > The following error is reported when running the {{test_valid_assignment}} > test from {{consumer_test.py}}: > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 584, in test_valid_assignment > wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, > consumer.current_assignment()), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when > num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])] > {code} > To reproduce, create a system test suite file named > {{test_valid_assignment.yml}} with these contents: > {code:yaml} > failures: > - > 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}' > {code} > Then set the the {{TC_PATHS}} environment variable to include that test suite > file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16474) AsyncKafkaConsumer might rapidly send out successive heartbeat causing partitions getting revoked
Philip Nee created KAFKA-16474: -- Summary: AsyncKafkaConsumer might rapidly send out successive heartbeat causing partitions getting revoked Key: KAFKA-16474 URL: https://issues.apache.org/jira/browse/KAFKA-16474 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee KAFKA-16389 We've discovered that in some uncommon cases, the consumer could send out successive heartbeats without waiting for the response to come back. this might result in causing the consumer to revoke its just assigned assignments in some cases. For example: The consumer first sends out a heartbeat with epoch=0 and memberId='' The consumer then rapidly sends out another heartbeat with epoch=0 and memberId='' because it has not gotten any response and thus not updating its local state The consumer receives assignments from the first heartbeat and reconciles its assignment. Since the second heartbeat has epoch=0 and memberId='', the server will think this is a new member joining and therefore send out an empty assignment. The consumer receives the response from the second heartbeat. Revoke all of its partitions. There are 2 issues associate with this bug: # inflight logic # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout
Philip Nee created KAFKA-16433: -- Summary: beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout Key: KAFKA-16433 URL: https://issues.apache.org/jira/browse/KAFKA-16433 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Assignee: Philip Nee As documented here:[https://github.com/apache/kafka/pull/15525] Both API should at least send out a request when zero timeout is provided. This is corrected in the PR above. We however still to fix the implementation for offsetsForTimes API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16405) Mismatch assignment error when running consumer rolling upgrade system tests
Philip Nee created KAFKA-16405: -- Summary: Mismatch assignment error when running consumer rolling upgrade system tests Key: KAFKA-16405 URL: https://issues.apache.org/jira/browse/KAFKA-16405 Project: Kafka Issue Type: Task Components: consumer, system tests Reporter: Philip Nee relevant to [https://github.com/apache/kafka/pull/15578] We are seeing: {code:java} SESSION REPORT (ALL TESTS) ducktape version: 0.11.4 session_id: 2024-03-21--001 run time: 3 minutes 24.632 seconds tests run:7 passed: 5 flaky:0 failed: 2 ignored: 0 test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=classic status: PASS run time: 24.599 seconds test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 26.638 seconds AssertionError("Mismatched assignment: {frozenset(), frozenset({TopicPartition(topic='test_topic', partition=3), TopicPartition(topic='test_topic', partition=0), TopicPartition(topic='test_topic', partition=1), TopicPartition(topic='test_topic', partition=2)})}") Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", line 77, in rolling_update_test self._verify_range_assignment(consumer) File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", line 38, in _verify_range_assignment assert assignment == set([ AssertionError: Mismatched assignment: {frozenset(), frozenset({TopicPartition(topic='test_topic', partition=3), TopicPartition(topic='test_topic', partition=0), TopicPartition(topic='test_topic', partition=1), TopicPartition(topic='test_topic', partition=2)})} test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False status: PASS run time: 29.815 seconds test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True status: PASS run time: 29.766 seconds test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic status: PASS run time: 30.086 seconds test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 35.965 seconds AssertionError("Mismatched assignment: {frozenset(), frozenset({TopicPartition(topic='test_topic', partition=3), TopicPartition(topic='test_topic', partition=0), TopicPartition(topic='test_topic', partition=1), TopicPartition(topic='test_topic', partition=2)})}") Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", line 77, in rolling_update_test self._verify_range_assignment(consumer) File
[jira] [Created] (KAFKA-16390) consumer_bench_test.py failed using AsyncKafkaConsumer
Philip Nee created KAFKA-16390: -- Summary: consumer_bench_test.py failed using AsyncKafkaConsumer Key: KAFKA-16390 URL: https://issues.apache.org/jira/browse/KAFKA-16390 Project: Kafka Issue Type: Task Components: consumer, system tests Reporter: Philip Nee Ran the system test based on KAFKA-16273 The following tests failed using the consumer group protocol {code:java} kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer {code} Because of {code:java} TimeoutError('consume_workload failed to finish in the expected amount of time.') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", line 146, in test_single_partition consume_workload.wait_for_done(timeout_sec=180) File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line 352, in wait_for_done wait_until(lambda: self.done(), File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 58, in wait_until raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from last_exception ducktape.errors.TimeoutError: consume_workload failed to finish in the expected amount of time. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
[ https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16116. Resolution: Fixed > AsyncKafkaConsumer: Add missing rebalance metrics > - > > Key: KAFKA-16116 > URL: https://issues.apache.org/jira/browse/KAFKA-16116 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: consumer-threading-refactor, metrics > Fix For: 3.8.0 > > > The following metrics are missing: > |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| > |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| > |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| > |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| > |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| > |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| > |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics
[ https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16115. Resolution: Fixed > AsyncKafkaConsumer: Add missing heartbeat metrics > - > > Key: KAFKA-16115 > URL: https://issues.apache.org/jira/browse/KAFKA-16115 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: consumer-threading-refactor, metrics > Fix For: 3.8.0 > > > The following metrics are missing: > |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]| > |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]| > |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]| > |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]| > |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics
[ https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16113. Resolution: Fixed > AsyncKafkaConsumer: Add missing offset commit metrics > - > > Key: KAFKA-16113 > URL: https://issues.apache.org/jira/browse/KAFKA-16113 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The following metrics are missing from the AsyncKafkaConsumer: > commit-latency-avg > commit-latency-max > commit-rate > commit-total > committed-time-ns-total -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected note in a tight loop
Philip Nee created KAFKA-16160: -- Summary: AsyncKafkaConsumer is trying to connect to a disconnected note in a tight loop Key: KAFKA-16160 URL: https://issues.apache.org/jira/browse/KAFKA-16160 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Observing some excessive logging running AsyncKafkaConsumer and observing excessive logging of : {code:java} 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, groupId=concurrent_consumer] Node is not ready, handle the request in the next event loop: node=worker4:9092 (id: 2147483644 rack: null), request=UnsentRequest{requestBuil der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], serverAssignor=null, topicP artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, node=Optional[worker4:9092 (id: 2147483644 rack: null)] , timer=org.apache.kafka.common.utils.Timer@55ed4733} (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} This seems to be triggered by a tight poll loop of the network thread. The right thing to do is to backoff a bit for that given node and retry later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16159) Prune excessive logging from Telemetry Reporter
Philip Nee created KAFKA-16159: -- Summary: Prune excessive logging from Telemetry Reporter Key: KAFKA-16159 URL: https://issues.apache.org/jira/browse/KAFKA-16159 Project: Kafka Issue Type: Task Components: consumer, log Reporter: Philip Nee Assignee: Apoorv Mittal While running system tests locally, I've noticed excessive logging of the Telemtry Reporter. This I believe was introduced in KIP-714. {code:java} [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, returning the value 224678 ms; the client will wait before submitting the next GetTelemetrySubscriptions network API request (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} This is logged several times per ms - Also, given the amount of log being emitted, can we also check the CPU profile to see if there's a process running a tight loop? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16112) Review JMX metrics in Async Consumer and determine the missing ones
[ https://issues.apache.org/jira/browse/KAFKA-16112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16112. Resolution: Fixed These are the results of this ticket |KAFKA-16113| |KAFKA-16116| |KAFKA-16115| > Review JMX metrics in Async Consumer and determine the missing ones > --- > > Key: KAFKA-16112 > URL: https://issues.apache.org/jira/browse/KAFKA-16112 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
Philip Nee created KAFKA-16116: -- Summary: AsyncKafkaConsumer: Add missing rebalance metrics Key: KAFKA-16116 URL: https://issues.apache.org/jira/browse/KAFKA-16116 Project: Kafka Issue Type: Improvement Reporter: Philip Nee Assignee: Philip Nee The following metrics are missing: |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics
Philip Nee created KAFKA-16115: -- Summary: AsyncKafkaConsumer: Add missing heartbeat metrics Key: KAFKA-16115 URL: https://issues.apache.org/jira/browse/KAFKA-16115 Project: Kafka Issue Type: Improvement Components: consumer, metrics Reporter: Philip Nee Assignee: Philip Nee The following metrics are missing: |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]| |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]| |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]| |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]| |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics
Philip Nee created KAFKA-16113: -- Summary: AsyncKafkaConsumer: Add missing offset commit metrics Key: KAFKA-16113 URL: https://issues.apache.org/jira/browse/KAFKA-16113 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Philip Nee Assignee: Philip Nee The following metrics are missing from the AsyncKafkaConsumer: commit-latency-avg commit-latency-max commit-rate commit-total committed-time-ns-total -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15250) DefaultBackgroundThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15250. Fix Version/s: (was: 3.8.0) Resolution: Fixed > DefaultBackgroundThread is running tight loop > - > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15948) Refactor AsyncKafkaConsumer shutdown
[ https://issues.apache.org/jira/browse/KAFKA-15948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15948. Assignee: Philip Nee (was: Phuc Hong Tran) Resolution: Fixed > Refactor AsyncKafkaConsumer shutdown > > > Key: KAFKA-15948 > URL: https://issues.apache.org/jira/browse/KAFKA-15948 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Upon closing we need a round trip from the network thread to the application > thread and then back to the network thread to complete the callback > invocation. Currently, we don't have any of that. I think we need to > refactor our closing mechanism. There are a few points to the refactor: > # The network thread should know if there's a custom user callback to > trigger or not. If there is, it should wait for the callback completion to > send a leave group. If not, it should proceed with the shutdown. > # The application thread sends a closing signal to the network thread and > continuously polls the background event handler until time runs out. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id
Philip Nee created KAFKA-16034: -- Summary: AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id Key: KAFKA-16034 URL: https://issues.apache.org/jira/browse/KAFKA-16034 Project: Kafka Issue Type: Bug Reporter: Philip Nee The consumer will log invalid request error when joining from fenced/unknown member id because we didn't reset the HeartbeatState and we won't send the needed fields (rebalanceTimeoutMs for example) when joining. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership
Philip Nee created KAFKA-16027: -- Summary: Refactor MetadataTest#testUpdatePartitionLeadership Key: KAFKA-16027 URL: https://issues.apache.org/jira/browse/KAFKA-16027 Project: Kafka Issue Type: Improvement Reporter: Philip Nee MetadataTest#testUpdatePartitionLeadership is extremely long. I think it is pretty close to the 160 line method limit. The test also contains two tests, so it is best to split it into two separate tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16026) AsyncConsumer does not send a poll event to the background thread
Philip Nee created KAFKA-16026: -- Summary: AsyncConsumer does not send a poll event to the background thread Key: KAFKA-16026 URL: https://issues.apache.org/jira/browse/KAFKA-16026 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee consumer poll does not send a poll event to the background thread to: # trigger autocommit # reset max poll interval timer -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16024) SaslPlaintextConsumerTest#testCoordinatorFailover is flaky
Philip Nee created KAFKA-16024: -- Summary: SaslPlaintextConsumerTest#testCoordinatorFailover is flaky Key: KAFKA-16024 URL: https://issues.apache.org/jira/browse/KAFKA-16024 Project: Kafka Issue Type: Bug Reporter: Philip Nee The test is flaky with the async consumer as we are observing {code:java} org.opentest4j.AssertionFailedError: Failed to observe commit callback before timeout{code} I was not able to replicate this on my local machine easily. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16023) PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding
Philip Nee created KAFKA-16023: -- Summary: PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding Key: KAFKA-16023 URL: https://issues.apache.org/jira/browse/KAFKA-16023 Project: Kafka Issue Type: Bug Reporter: Philip Nee Several tests in PlaintextConsumerTest.scala (such as testPerPartitionLagMetricsCleanUpWithSubscribe) uses: assertEquals(1, listener.callsToAssigned, "should be assigned once") However, as the timing for reconciliation completion is not deterministic due to asynchronous processing. We actually need to wait until the condition to happen. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16022) AsyncKafkaConsumer sometimes complains "No current assignment for partition {}"
Philip Nee created KAFKA-16022: -- Summary: AsyncKafkaConsumer sometimes complains "No current assignment for partition {}" Key: KAFKA-16022 URL: https://issues.apache.org/jira/browse/KAFKA-16022 Project: Kafka Issue Type: Bug Reporter: Philip Nee This seems to be a timing issue that before the member receives any assignment from the coordinator, the fetcher will try to find the current position causing "No current assignment for partition {}". This creates a small amount of noise to the log. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invokes and verify the consumer callback
Philip Nee created KAFKA-16019: -- Summary: Some of the tests in PlaintextConsumer can't seem to deterministically invokes and verify the consumer callback Key: KAFKA-16019 URL: https://issues.apache.org/jira/browse/KAFKA-16019 Project: Kafka Issue Type: Task Reporter: Philip Nee I was running the PlaintextConsumer to test the async consumer; however, a few tests were failing with not being able to verify the listener is invoked correctly For example `testPerPartitionLeadMetricsCleanUpWithSubscribe` Around 50% of the time, the listener's callsToAssigned was never incremented correctly. Event changing it to awaitUntilTrue it was still the same case consumer.subscribe(List(topic, topic2).asJava, listener) val records = awaitNonEmptyRecords(consumer, tp) assertEquals(1, listener.callsToAssigned, "should be assigned once") -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15993) Enable integration tests that relies on rebalance listener
Philip Nee created KAFKA-15993: -- Summary: Enable integration tests that relies on rebalance listener Key: KAFKA-15993 URL: https://issues.apache.org/jira/browse/KAFKA-15993 Project: Kafka Issue Type: Task Reporter: Philip Nee We will enable integration tests using the async consumer in KAFKA-15971. However, we should also enable tests that rely on rebalance listeners after KAFKA-15628 is closed. One example would be testMaxPollIntervalMs, that I relies on the listener to verify the correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15948) Refactor AsyncKafkaConsumer shutdown
Philip Nee created KAFKA-15948: -- Summary: Refactor AsyncKafkaConsumer shutdown Key: KAFKA-15948 URL: https://issues.apache.org/jira/browse/KAFKA-15948 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Upon closing we need a round trip from the network thread to the application thread and then back to the network thread to complete the callback invocation. Currently, we don't have any of that. I think we need to refactor our closing mechanism. There are a few points to the refactor: # The network thread should know if there's a custom user callback to trigger or not. If there is, it should wait for the callback completion to send a leave group. If not, it should proceed with the shutdown. # The application thread sends a closing signal to the network thread and continuously polls the background event handler until time runs out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15946) AsyncKafkaConsumer should retry commits on the application thread instead of autoretry
Philip Nee created KAFKA-15946: -- Summary: AsyncKafkaConsumer should retry commits on the application thread instead of autoretry Key: KAFKA-15946 URL: https://issues.apache.org/jira/browse/KAFKA-15946 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee The original design was that the network thread always completes the future whether succeeds or fails. However, in the current patch, I mis-added auto-retry functionality because commitSync wasn't retrying. What we should be doing is, the commit sync API should catch the RetriableExceptions and resend another commit until timesout. ``` if (error.exception() instanceof RetriableException) { log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, error.message()); handleRetriableError(error, response); retry(responseTime); <--- We probably shouldn't do this. return; } ``` and ``` @Override public void commitSync(Map offsets, Duration timeout) { acquireAndEnsureOpen(); long commitStart = time.nanoseconds(); try { CompletableFuture commitFuture = commit(offsets, true); <-- we probably should retry here ConsumerUtils.getResult(commitFuture, time.timer(timeout)); } finally { wakeupTrigger.clearTask(); kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); release(); } } ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15942) Implement ConsumerInterceptor
Philip Nee created KAFKA-15942: -- Summary: Implement ConsumerInterceptor Key: KAFKA-15942 URL: https://issues.apache.org/jira/browse/KAFKA-15942 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Philip Nee As title, we need to implement ConsumerInterceptor in the AsyncKafkaConsumer This is the current code. The implementation would be very similar {code:java} if (interceptors != null) interceptors.onCommit(offsets); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15913) Remove excessive use of spy in CosumerTestBuilder
Philip Nee created KAFKA-15913: -- Summary: Remove excessive use of spy in CosumerTestBuilder Key: KAFKA-15913 URL: https://issues.apache.org/jira/browse/KAFKA-15913 Project: Kafka Issue Type: Improvement Reporter: Philip Nee ConsumerTestBuilder is meant to be an unit testing utility; however, we seem to use Mockito#spy quite liberally. This is not the right testing strategy because we basically turn unit testing into integration testing. While the current unit tests run fine, we should probably make the mocking using Mockito#mock by default and test each dependency independently. The ask here is # Make mock(class) by default # Provide more flexible interface for the testBuilder to allow user to configure spy or mock. Or, let user pass in their own mock. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15872) Investigate autocommit retry logic
Philip Nee created KAFKA-15872: -- Summary: Investigate autocommit retry logic Key: KAFKA-15872 URL: https://issues.apache.org/jira/browse/KAFKA-15872 Project: Kafka Issue Type: Sub-task Reporter: Philip Nee This is purely an investigation ticket. Currently, we send an autocommit only if there isn't an inflight one; however, this logic might not be correct because I think we should: # expires the request if it is not completed in time # always send an autocommit on the clock -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15836) KafkaConsumer subscribes to multiple topics does not respect max.poll.records
[ https://issues.apache.org/jira/browse/KAFKA-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15836. Resolution: Fixed PR merged. > KafkaConsumer subscribes to multiple topics does not respect max.poll.records > - > > Key: KAFKA-15836 > URL: https://issues.apache.org/jira/browse/KAFKA-15836 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Philip Nee >Assignee: Andrew Schofield >Priority: Blocker > Labels: consumer > Fix For: 3.7.0 > > > We discovered that when KafkaConsumer subscribes to multiple topics with > max.poll.record configured. The max.poll.record is not properly respected > for all poll() invocation. > > I was able to reproduce it with the AK example, here is how I ran my tests: > [https://github.com/apache/kafka/pull/14772] > > 1. start zookeeper and kafka server (or kraft mode should be fine too) > 2. Run: examples/bin/java-producer-consumer-demo.sh 1000 > 3. Polled records > 400 will be printed to stdout > > Here is what the program does: > The produce produces a large number of records to multiple topics. We > configure the consumer using a max.poll.record = 400, and subscribed to > multiple topics. The consumer poll, and the returned records can sometimes > be larger than 400. > > This is an issue in AK 3.6 but 3.5 was fine. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15867) Should ConsumerNetworkThread wrap the exception and notify the polling thread?
Philip Nee created KAFKA-15867: -- Summary: Should ConsumerNetworkThread wrap the exception and notify the polling thread? Key: KAFKA-15867 URL: https://issues.apache.org/jira/browse/KAFKA-15867 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee The ConsumerNetworkThread runs a tight loop infinitely. However, when encountering an unexpected exception, it logs an error and continues. I think this might not be ideal because user can run blind for a long time before discovering there's something wrong with the code; so I believe we should propagate the throwable back to the polling thread. cc [~lucasbru] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15866) Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState
Philip Nee created KAFKA-15866: -- Summary: Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState Key: KAFKA-15866 URL: https://issues.apache.org/jira/browse/KAFKA-15866 Project: Kafka Issue Type: Improvement Reporter: Philip Nee The current OffsetFetchRequestState error handling uses nested if-else, which is quite different, stylistically, to the OffsetCommitRequestState using a switch statment. The latter is a bit more readable so we should refactor the error handling using the same style to improve readability. A minor point: Some of the error handling seems inconsistent with the commit. The logic was from the current implementation, so we should also review all the error handling. For example: somehow the current logic doesn't mark the coordinator unavailable when receiving COORDINATOR_NOT_AVAILABLE -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback
Philip Nee created KAFKA-15865: -- Summary: Ensure consumer.poll() execute autocommit callback Key: KAFKA-15865 URL: https://issues.apache.org/jira/browse/KAFKA-15865 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee When the network thread completes autocommits, we need to send a message/event to the application to notify the thread to execute the callback. In KAFKA-15327, the network thread sends a AutoCommitCompletionBackgroundEvent to the polling thread. The polling thread should trigger the OffsetCommitCallback upon receiving it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15174) Ensure the correct thread is executing the callbacks
[ https://issues.apache.org/jira/browse/KAFKA-15174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15174. Resolution: Fixed > Ensure the correct thread is executing the callbacks > > > Key: KAFKA-15174 > URL: https://issues.apache.org/jira/browse/KAFKA-15174 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > We need to add assertion tests to ensure the correct thread is executing the > offset commit callbacks and rebalance callback -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15836) KafkaConsumer subscribe to multiple topics does not respect max.poll.records
Philip Nee created KAFKA-15836: -- Summary: KafkaConsumer subscribe to multiple topics does not respect max.poll.records Key: KAFKA-15836 URL: https://issues.apache.org/jira/browse/KAFKA-15836 Project: Kafka Issue Type: Bug Affects Versions: 3.6.0 Reporter: Philip Nee Assignee: Kirk True We discovered that when KafkaConsumer subscribes to multiple topics with max.poll.record configured. The max.poll.record is not properly respected for all poll() invocation. I was able to reproduce it with the AK example, here is how I ran my tests: [https://github.com/apache/kafka/pull/14772] 1. start zookeeper and kafka server (or kraft mode should be fine too) 2. Run: examples/bin/java-producer-consumer-demo.sh 1000 3. Polled records > 400 will be printed to stdout Here is what the program does: The produce produces a large number of records to multiple topics. We configure the consumer using a max.poll.record = 400, and subscribed to multiple topics. The consumer poll, and the returned records can sometimes be larger than 400. This is an issue in AK 3.6 but 3.5 was fine. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15818) Implement max poll internval
Philip Nee created KAFKA-15818: -- Summary: Implement max poll internval Key: KAFKA-15818 URL: https://issues.apache.org/jira/browse/KAFKA-15818 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee In the network thread, we need a timer configure to take MAX_POLL_INTERVAL_MAX. The reason is if the user don't poll the consumer within the internal, the member needs to leave the group. Currently, we send an acknowledgement event to the network thread per poll. It needs to do two things 1. update autocommit state 2. update max poll interval timer The current logic looks like this: {code:java} if (heartbeat.pollTimeoutExpired(now)) { // the poll timeout has expired, which means that the foreground thread has stalled // in between calls to poll(). log.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + "was longer than the configured max.poll.interval.ms, which typically implies that " + "the poll loop is spending too much time processing messages. You can address this " + "either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + "returned in poll() with max.poll.records."); maybeLeaveGroup("consumer poll timeout has expired."); } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15803) Update last seen epcoh during commit
Philip Nee created KAFKA-15803: -- Summary: Update last seen epcoh during commit Key: KAFKA-15803 URL: https://issues.apache.org/jira/browse/KAFKA-15803 Project: Kafka Issue Type: Task Reporter: Philip Nee Assignee: Philip Nee At the time we implemented commitAsync in the prototypeAsyncConsumer, metadata was not there. The ask here is to investigate if we need to add the following function to the commit code: private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) { if (offsetAndMetadata != null) offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); } -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15686) Consumer should be able to detect network problem
[ https://issues.apache.org/jira/browse/KAFKA-15686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15686. Assignee: Philip Nee Resolution: Won't Fix The user can use the admin client to get the partition leader information. > Consumer should be able to detect network problem > - > > Key: KAFKA-15686 > URL: https://issues.apache.org/jira/browse/KAFKA-15686 > Project: Kafka > Issue Type: New Feature > Components: consumer >Affects Versions: 3.5.0 >Reporter: Jiahongchao >Assignee: Philip Nee >Priority: Minor > > When we call poll method in consumer, it will return normally even if some > partitions do not have a leader. > What should we do to detect such failures? Currently we have to check log to > find out broker connection problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15775) Implement listTopics() and partitionFor() for the AsyncKafkaConsumer
Philip Nee created KAFKA-15775: -- Summary: Implement listTopics() and partitionFor() for the AsyncKafkaConsumer Key: KAFKA-15775 URL: https://issues.apache.org/jira/browse/KAFKA-15775 Project: Kafka Issue Type: Task Reporter: Philip Nee {code:java} @Override public List partitionsFor(String topic) { return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs)); } @Override public List partitionsFor(String topic, Duration timeout) { throw new KafkaException("method not implemented"); } @Override public Map> listTopics() { return listTopics(Duration.ofMillis(defaultApiTimeoutMs)); } @Override public Map> listTopics(Duration timeout) { throw new KafkaException("method not implemented"); } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15679) Client support for new consumer configs
[ https://issues.apache.org/jira/browse/KAFKA-15679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15679. Resolution: Fixed > Client support for new consumer configs > --- > > Key: KAFKA-15679 > URL: https://issues.apache.org/jira/browse/KAFKA-15679 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Major > Labels: kip-848-client-support, kip-848-e2e, kip-848-preview > > New consumer should support the new configs introduced by KIP-848 > |group.protocol|enum|generic|A flag which indicates if the new protocol > should be used or not. It could be: generic or consumer| > |group.remote.assignor|string|null|The server side assignor to use. It cannot > be used in conjunction with group.local.assignor. {{null}} means that the > choice of the assignor is left to the group coordinator.| > The protocol introduces a 3rd property for client side (local) assignors, but > that will be introduced later on. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15531) Ensure coordinator node is removed upon disconnection exception
[ https://issues.apache.org/jira/browse/KAFKA-15531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15531. Resolution: Fixed > Ensure coordinator node is removed upon disconnection exception > --- > > Key: KAFKA-15531 > URL: https://issues.apache.org/jira/browse/KAFKA-15531 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > In the async consumer, the coordinator isn't being removed when receiving the > following exception: > > {code:java} > (e instanceof DisconnectException) { > markCoordinatorUnknown(true, e.getMessage()); > }{code} > > This should happen on all requests going to coordinator node: > 1. heartbeat 2. offset fetch/commit -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15316) CommitRequestManager not calling RequestState callbacks
[ https://issues.apache.org/jira/browse/KAFKA-15316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15316. Resolution: Fixed > CommitRequestManager not calling RequestState callbacks > > > Key: KAFKA-15316 > URL: https://issues.apache.org/jira/browse/KAFKA-15316 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848-preview > > CommitRequestManager is not triggering the RequestState callbacks that update > {_}lastReceivedMs{_}, affecting the _canSendRequest_ verification of the > RequestState -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15562) Ensure fetch offset and commit offset handler handles both timeout and various error types
[ https://issues.apache.org/jira/browse/KAFKA-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15562. Resolution: Fixed > Ensure fetch offset and commit offset handler handles both timeout and > various error types > -- > > Key: KAFKA-15562 > URL: https://issues.apache.org/jira/browse/KAFKA-15562 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview > > Both fetchOffsetRequest and commitOffsetRequest handlers don't have > sufficient logic to handle timeout exception. > > CommitOffsetRequest handler also doesn't handle various of server error such > as coordinator not found. We need to handle: > If Exception is non null: > - handle RetriableError that respects requestTimeoutMs > - handle NonRetriableError > > If the response contains error, ensure to: > - mark coordinator unknown if needed > - retry if needed > - fail the request -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15773) Group protocol configuration should be validated
Philip Nee created KAFKA-15773: -- Summary: Group protocol configuration should be validated Key: KAFKA-15773 URL: https://issues.apache.org/jira/browse/KAFKA-15773 Project: Kafka Issue Type: Improvement Reporter: Philip Nee Assignee: Philip Nee If the user specifies using the generic group, or not specifying the group.protocol config at all, we should invalidate all group.remote.assignor If group.local.assignor and group.remote.assignor are both configured, we should also invalidate the configuration This is an optimization/user experience improvement. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15631) Do not send new heartbeat request while another one in-flight
[ https://issues.apache.org/jira/browse/KAFKA-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15631. Resolution: Not A Problem > Do not send new heartbeat request while another one in-flight > - > > Key: KAFKA-15631 > URL: https://issues.apache.org/jira/browse/KAFKA-15631 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > Client consumer should not send a new heartbeat request while there is a > previous in-flight. If a HB is in-flight, we should wait for a response or > timeout before sending a next one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15550) OffsetsForTimes validation for negative timestamps in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15550. Resolution: Fixed > OffsetsForTimes validation for negative timestamps in new consumer > -- > > Key: KAFKA-15550 > URL: https://issues.apache.org/jira/browse/KAFKA-15550 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848, kip-848-preview > > OffsetsForTimes api call should fail with _IllegalArgumentException_ if > negative timestamps are provided as arguments. This will effectively exclude > earliest and latest offsets as target times, keeping the current behaviour of > the KafkaConsumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15697) Add local assignor and ensure it cannot be used with server side assignor
Philip Nee created KAFKA-15697: -- Summary: Add local assignor and ensure it cannot be used with server side assignor Key: KAFKA-15697 URL: https://issues.apache.org/jira/browse/KAFKA-15697 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee When we start supporting local/client-side assignor, we should: # Add the config to ConsumerConfig # Examine where should we implement to logic to ensure it is not used along side with the server side assignor, i.e. you can only specify local or remote assignor, or non. ## If both assignors are specified: Throw illegalArgumentException -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15642) Ensure offset fetcher behaves correctly when the request is Timeout
Philip Nee created KAFKA-15642: -- Summary: Ensure offset fetcher behaves correctly when the request is Timeout Key: KAFKA-15642 URL: https://issues.apache.org/jira/browse/KAFKA-15642 Project: Kafka Issue Type: Bug Reporter: Philip Nee Assignee: Philip Nee I need to test the behavior of OffsetFetcher when the request is timeout - It seems like we should continue to retry until the top level timer times out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15592) Member does not need to always try to join a group when a groupId is configured
Philip Nee created KAFKA-15592: -- Summary: Member does not need to always try to join a group when a groupId is configured Key: KAFKA-15592 URL: https://issues.apache.org/jira/browse/KAFKA-15592 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee Currently, instantiating a membershipManager means the member will always seek to join a group unless it has failed fatally. However, this is not always the case because the member should be able to join and leave a group any time during its life cycle. Maybe we should include an "inactive" state in the state machine indicating the member does not want to be in a rebalance group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15588) Purge the unsent offset commits/fetches when the member is fenced
Philip Nee created KAFKA-15588: -- Summary: Purge the unsent offset commits/fetches when the member is fenced Key: KAFKA-15588 URL: https://issues.apache.org/jira/browse/KAFKA-15588 Project: Kafka Issue Type: Bug Reporter: Philip Nee Assignee: Philip Nee When the member is fenced/failed, we should purge the inflight offset commits and fetches. HeartbeatRequestManager should be able to handle this -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
[ https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15278. Resolution: Fixed > Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC > > > Key: KAFKA-15278 > URL: https://issues.apache.org/jira/browse/KAFKA-15278 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support, kip-848-e2e, kip-848-preview > > The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} > and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. > It is assumed that the scaffolding for the other two will come along in time. > * Implement {{ConsumerGroupRequestManager}} > * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts > so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} > interval regardless of other {{RequestManager}} instance activity > * Ensure error is handled correctly > * Ensure MembershipStateManager is updated on both successful and failures > cases, and the state machine is transioned to the correct state. > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15562) CommitRequestManager needs to test different error handling
Philip Nee created KAFKA-15562: -- Summary: CommitRequestManager needs to test different error handling Key: KAFKA-15562 URL: https://issues.apache.org/jira/browse/KAFKA-15562 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee Review the code in Consumercoordinator#OffsetCommitResponseHandler Implement the error handling and add tests for these errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15553) Review committed offset refresh logic
Philip Nee created KAFKA-15553: -- Summary: Review committed offset refresh logic Key: KAFKA-15553 URL: https://issues.apache.org/jira/browse/KAFKA-15553 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee >From the exsiting comment: If there are any partitions which do not have a >valid position and are not awaiting reset, then we need to fetch committed >offsets. In the async consumer: I wonder if it would make sense to refresh the position on the event loop continuously. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls
Philip Nee created KAFKA-15551: -- Summary: Evaluate conditions for short circuiting consumer API calls Key: KAFKA-15551 URL: https://issues.apache.org/jira/browse/KAFKA-15551 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee For conditions like: * Committing empty offset * Fetching offsets for empty partitions * Getting empty topic partition position Should be short circuit possibly at the API level. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15548) Handling close() properly
Philip Nee created KAFKA-15548: -- Summary: Handling close() properly Key: KAFKA-15548 URL: https://issues.apache.org/jira/browse/KAFKA-15548 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee Upon closing we need to: # Complete pending commits # Auto-commit if needed # Send the last GroupConsumerHeartbeatRequest with epoch = -1 to leave the group # poll the NetworkClient to complete pending I/O -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15534) Propagate client response time when timeout to the request handler
Philip Nee created KAFKA-15534: -- Summary: Propagate client response time when timeout to the request handler Key: KAFKA-15534 URL: https://issues.apache.org/jira/browse/KAFKA-15534 Project: Kafka Issue Type: Bug Reporter: Philip Nee Assignee: Philip Nee Currently, we don't have a good way to propagate the response time to the handler when timeout is thrown. {code:java} unsent.handler.onFailure(new TimeoutException( "Failed to send request after " + unsent.timer.timeoutMs() + " ms.")); {code} The current request manager invoke a system call to retrieve the response time, which is not idea because it is already available at network client This is an example of the coordinator request manager: {code:java} unsentRequest.future().whenComplete((clientResponse, throwable) -> { long responseTimeMs = time.milliseconds(); if (clientResponse != null) { FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody(); onResponse(responseTimeMs, response); } else { onFailedResponse(responseTimeMs, throwable); } }); {code} But in the networkClientDelegate, we should utilize the currentTimeMs in the trySend to avoid calling time.milliseconds(): {code:java} private void trySend(final long currentTimeMs) { ... unsent.handler.onFailure(new TimeoutException( "Failed to send request after " + unsent.timer.timeoutMs() + " ms.")); continue; } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15533) Ensure HeartbeatRequestManager only send out some fields once
Philip Nee created KAFKA-15533: -- Summary: Ensure HeartbeatRequestManager only send out some fields once Key: KAFKA-15533 URL: https://issues.apache.org/jira/browse/KAFKA-15533 Project: Kafka Issue Type: Bug Reporter: Philip Nee Assignee: Philip Nee We want to ensure ConsumerGroupHeartbeatRequest is as lightweight as possible, so a lot of fields in it don't need to be resend. An example would be the rebalanceTimeoutMs, currently we have the following code: {code:java} ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData() .setGroupId(membershipManager.groupId()) .setMemberEpoch(membershipManager.memberEpoch()) .setMemberId(membershipManager.memberId()) .setRebalanceTimeoutMs(rebalanceTimeoutMs); {code} We should encapsulate these once-used fields into a class such as HeartbeatMetdataBuilder, and it should maintain a state of whether a certain field needs to be sent or not. Note that, currently only 3 fields are mandatory in the request: * groupId * memberEpoch * memberId -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15531) Ensure coordinator node is removed upon disconnection exception
Philip Nee created KAFKA-15531: -- Summary: Ensure coordinator node is removed upon disconnection exception Key: KAFKA-15531 URL: https://issues.apache.org/jira/browse/KAFKA-15531 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee In the async consumer, the coordinator isn't being removed when receiving the following exception: {code:java} (e instanceof DisconnectException) { markCoordinatorUnknown(true, e.getMessage()); }{code} This should happen on all requests going to coordinator node: 1. heartbeat 2. offset fetch/commit -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15475) Timeout request might retry forever even if the user API times out in AsyncConsumer
Philip Nee created KAFKA-15475: -- Summary: Timeout request might retry forever even if the user API times out in AsyncConsumer Key: KAFKA-15475 URL: https://issues.apache.org/jira/browse/KAFKA-15475 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee If the request timeout in the background thread, it will be completed with TimeoutException, which is Retriable. In the TopicMetadataRequestManager and possibly other managers, the request might continue to be retried forever. There are two ways to fix this # Pass a timer to the manager to remove the inflight requests when it is expired. # Pass the future to the application layer and continue to retry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15474) AbstractCoordinator.testWakeupAfterSyncGroupReceivedExternalCompletion seems flaky
Philip Nee created KAFKA-15474: -- Summary: AbstractCoordinator.testWakeupAfterSyncGroupReceivedExternalCompletion seems flaky Key: KAFKA-15474 URL: https://issues.apache.org/jira/browse/KAFKA-15474 Project: Kafka Issue Type: Test Affects Versions: 3.6.0 Reporter: Philip Nee Ran into test failures when running the full AbstractCoordinatorTest unit test suit. It is not very easy to reproduce, I seem to need to run the full unit module to "sometimes" reproduce it. {code:java} Should have woken up from ensureActiveGroup() org.opentest4j.AssertionFailedError: Should have woken up from ensureActiveGroup() at app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:135) at app//org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest.testWakeupAfterSyncGroupReceivedExternalCompletion(AbstractCoordinatorTest.java:1531) at java.base@15.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) {code} testWakeupAfterSyncGroupReceived also appears flaky: {code:java} Should have woken up from ensureActiveGroup() org.opentest4j.AssertionFailedError: Should have woken up from ensureActiveGroup() at app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:135) at app//org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest.testWakeupAfterSyncGroupReceived(AbstractCoordinatorTest.java:1498) at java.base@15.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@15.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15333) Flaky build failure throwing Connect Exception: Could not connect to server....
Philip Nee created KAFKA-15333: -- Summary: Flaky build failure throwing Connect Exception: Could not connect to server Key: KAFKA-15333 URL: https://issues.apache.org/jira/browse/KAFKA-15333 Project: Kafka Issue Type: Test Components: connect, unit tests Reporter: Philip Nee We frequently observe flaky build failure with the following message. The is from the most recent PR post 3.5.0: {code:java} > Task :generator:testClasses UP-TO-DATE Unexpected exception thrown. org.gradle.internal.remote.internal.MessageIOException: Could not read message from '/127.0.0.1:38354'. at org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:94) at org.gradle.internal.remote.internal.hub.MessageHub$ConnectionReceive.run(MessageHub.java:270) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) at org.gradle.internal.concurrent.AbstractManagedExecutor$1.run(AbstractManagedExecutor.java:47) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.IllegalArgumentException at org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:72) at org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:52) at org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:81) ... 6 more > Task :streams:upgrade-system-tests-26:unitTest org.gradle.internal.remote.internal.ConnectException: Could not connect to server [3156f144-9a89-4c47-91ad-88a8378ec726 port:37889, addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1]. at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:67) at org.gradle.internal.remote.internal.hub.MessageHubBackedClient.getConnection(MessageHubBackedClient.java:36) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:103) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65) at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69) at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74) Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716) at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:122) at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.tryConnect(TcpOutgoingConnector.java:81) at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:54) ... 5 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
Philip Nee created KAFKA-15305: -- Summary: The background thread should try to process the remaining task until the shutdown timer is expired Key: KAFKA-15305 URL: https://issues.apache.org/jira/browse/KAFKA-15305 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Philip Nee Assignee: Philip Nee While working on https://issues.apache.org/jira/browse/KAFKA-15304 close() API supplies a timeout parameter so that the consumer can have a grace period to process things before shutting down. The background thread currently doesn't do that, when close() is initiated, it will immediately close all of its dependencies. This might not be desirable because there could be remaining tasks to be processed before closing. Maybe the correct things to do is to first stop accepting API request, second, let the runOnce() continue to run before the shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15304) CompletableApplicationEvents aren't being completed when the consumer is closing
Philip Nee created KAFKA-15304: -- Summary: CompletableApplicationEvents aren't being completed when the consumer is closing Key: KAFKA-15304 URL: https://issues.apache.org/jira/browse/KAFKA-15304 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee If the background thread is closed before ingesting all ApplicationEvents, we should drain the background queue and try to cancel these events before closing. We can try to process these events before closing down the consumer; however, we assume that when the user issues a close command, the consumer should be shut down promptly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15250) DefaultBackgroundThread is running tight loop
Philip Nee created KAFKA-15250: -- Summary: DefaultBackgroundThread is running tight loop Key: KAFKA-15250 URL: https://issues.apache.org/jira/browse/KAFKA-15250 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15175) Assess the use of nio2 asynchronous channel for KafkaConsumer
Philip Nee created KAFKA-15175: -- Summary: Assess the use of nio2 asynchronous channel for KafkaConsumer Key: KAFKA-15175 URL: https://issues.apache.org/jira/browse/KAFKA-15175 Project: Kafka Issue Type: Wish Components: consumer Reporter: Philip Nee We should assess if NIO2 is appropriate to replace the current nio library with more performance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15174) Ensure the correct thread is executing the callbacks
Philip Nee created KAFKA-15174: -- Summary: Ensure the correct thread is executing the callbacks Key: KAFKA-15174 URL: https://issues.apache.org/jira/browse/KAFKA-15174 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Assignee: Philip Nee We need to add assertion tests to ensure the correct thread is executing the offset commit callbacks and rebalance callback -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15173) ApplicationEventQueue and BackgroundEventQueue should be bounded
Philip Nee created KAFKA-15173: -- Summary: ApplicationEventQueue and BackgroundEventQueue should be bounded Key: KAFKA-15173 URL: https://issues.apache.org/jira/browse/KAFKA-15173 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Assignee: Philip Nee The async consumer uses ApplicationEventQueue and BackgroundEventQueue to facilitate message passing between the application thread and the background thread. The current implementation is boundless, which can potentially cause OOM and other performance-related issues. I think the queues need a finite bound, and we need to decide how to handle the situation when the bound is reached. In particular, I would like to answer these questions: # What should the upper limit be for both queues: Can this be a configurable, memory-based bound? Or just an arbitrary number of events as the bound. # What should happen when the application event queue is filled up? It seems like we should introduce a new exception type and notify the user that the consumer is full. # What should happen when the background event queue is filled up? This seems less likely to happen, but I imagine it could happen when the user stops polling the consumer, causing the queue to be filled. # Is it necessary to introduce a public configuration for the queue? I think initially we would select an arbitrary constant number and see the community feedback to make a forward plan accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14977) testDescribeStateOfExistingGroupWithRoundRobinAssignor is flaky
Philip Nee created KAFKA-14977: -- Summary: testDescribeStateOfExistingGroupWithRoundRobinAssignor is flaky Key: KAFKA-14977 URL: https://issues.apache.org/jira/browse/KAFKA-14977 Project: Kafka Issue Type: Bug Components: unit tests Reporter: Philip Nee Attachments: failed_test.log Relevent ticket: - KAFKA-8110 Flaky Test DescribeConsumerGroupTest#testDescribeMembersWithConsumersWithoutAssignedPartitions - KAFKA-7969 Flaky Test DescribeConsumerGroupTest#testDescribeOffsetsOfExistingGroupWithNoMembers - KAFKA-8068 Flaky Test DescribeConsumerGroupTest#testDescribeMembersOfExistingGroup - KAFKA-8706 Kafka 2.3.0 Transient Unit Test Failures on Oracle Linux - See attached for details See the attachment for failure -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13668) Failed cluster authorization should not be fatal for producer
[ https://issues.apache.org/jira/browse/KAFKA-13668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-13668. Fix Version/s: 3.6.0 Resolution: Fixed > Failed cluster authorization should not be fatal for producer > - > > Key: KAFKA-13668 > URL: https://issues.apache.org/jira/browse/KAFKA-13668 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Philip Nee >Priority: Major > Fix For: 3.6.0 > > > The idempotent producer fails fatally if the initial `InitProducerId` returns > CLUSTER_AUTHORIZATION_FAILED. This makes the producer unusable until a new > instance is constructed. For some applications, it is more convenient to keep > the producer instance active and let the administrator fix the permission > problem instead of going into a crash loop. Additionally, most applications > will probably not be smart enough to reconstruct the producer instance, so if > the application does not handle the error by failing, users will have to > restart the application manually. > I think it would be better to let the producer retry the `InitProducerId` > request as long as the user keeps trying to use the producer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-14016. Fix Version/s: 3.5.0 3.4.1 Assignee: Philip Nee Resolution: Fixed > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Assignee: Philip Nee >Priority: Major > Labels: new-rebalance-should-fix > Fix For: 3.5.0, 3.4.1 > > Attachments: CooperativeStickyAssignorBugReproduction.java > > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive
[ https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-13891. Fix Version/s: 3.5.0 3.4.1 (was: 3.6.0) Resolution: Fixed > sync group failed with rebalanceInProgress error cause rebalance many rounds > in coopeartive > --- > > Key: KAFKA-13891 > URL: https://issues.apache.org/jira/browse/KAFKA-13891 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Shawn Wang >Assignee: Philip Nee >Priority: Major > Fix For: 3.5.0, 3.4.1 > > > This issue was first found in > [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419] > But the previous PR forgot to reset generation when sync group failed with > rebalanceInProgress error. So the previous bug still exists and it may cause > consumer to rebalance many rounds before final stable. > Here's the example ({*}bold is added{*}): > # consumer A joined and synced group successfully with generation 1 *( with > ownedPartition P1/P2 )* > # New rebalance started with generation 2, consumer A joined successfully, > but somehow, consumer A doesn't send out sync group immediately > # other consumer completed sync group successfully in generation 2, except > consumer A. > # After consumer A send out sync group, the new rebalance start, with > generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group > response > # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with > generation 3, with the assignment (ownedPartition) in generation 1. > # So, now, we have out-of-date ownedPartition sent, with unexpected results > happened > # *After the generation-3 rebalance, consumer A got P3/P4 partition. the > ownedPartition is ignored because of old generation.* > # *consumer A revoke P1/P2 and re-join to start a new round of rebalance* > # *if some other consumer C failed to syncGroup before consumer A's > joinGroup. the same issue will happens again and result in many rounds of > rebalance before stable* > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14960) Metadata Request Manager and listTopics/partitionsFor API
Philip Nee created KAFKA-14960: -- Summary: Metadata Request Manager and listTopics/partitionsFor API Key: KAFKA-14960 URL: https://issues.apache.org/jira/browse/KAFKA-14960 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Philip Nee Assignee: Philip Nee Implement listTopics and partitionsFor -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14950) Implement assign() and assignment()
Philip Nee created KAFKA-14950: -- Summary: Implement assign() and assignment() Key: KAFKA-14950 URL: https://issues.apache.org/jira/browse/KAFKA-14950 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Philip Nee Assignee: Philip Nee Implement assign() and assignment() -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14875) Implement Wakeup()
Philip Nee created KAFKA-14875: -- Summary: Implement Wakeup() Key: KAFKA-14875 URL: https://issues.apache.org/jira/browse/KAFKA-14875 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Assignee: Philip Nee Implement wakeup() and WakeupException. This would be different to the current implementation because I think we just need to interrupt the blocking futures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14754) improve consumer under example package
[ https://issues.apache.org/jira/browse/KAFKA-14754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-14754. Resolution: Fixed > improve consumer under example package > -- > > Key: KAFKA-14754 > URL: https://issues.apache.org/jira/browse/KAFKA-14754 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Philip Nee >Priority: Major > > I found the producer and consumer example is not in a good form. For example: > # Both consumer and producer doesn't gracefully close the resource after > completed > # The example doesn't provide a good example to handle different kind of > exceptions. It's just a happy path example > # No clear comment to instruct users why we should do this, and what it is > doing for each operation. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14753) improve producer under example package
[ https://issues.apache.org/jira/browse/KAFKA-14753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-14753. Resolution: Fixed > improve producer under example package > -- > > Key: KAFKA-14753 > URL: https://issues.apache.org/jira/browse/KAFKA-14753 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Federico Valeri >Priority: Major > > I found the producer and consumer example is not in a good form. For example: > # Both consumer and producer doesn't gracefully close the resource after > completed > # The example doesn't provide a good example to handle different kind of > exceptions. It's just a happy path example > # No clear comment to instruct users why we should do this, and what it is > doing for each operation. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-14752) improve kafka examples under examples package
[ https://issues.apache.org/jira/browse/KAFKA-14752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reopened KAFKA-14752: > improve kafka examples under examples package > - > > Key: KAFKA-14752 > URL: https://issues.apache.org/jira/browse/KAFKA-14752 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Federico Valeri >Priority: Major > Labels: newbie, newbie++ > > Kafka provided some examples under "examples" package. Currently we provided > * java-producer-consumer-demo, which is to produce 1 records and then > consume all of them > * exactly-once-demo, which is to produce records -> consume -> process -> > consume. > Among them, the base component is producer and consumer. However, I found the > producer and consumer example is not in a good form. For example: > # Both consumer and producer doesn't gracefully close the resource after > completed > # The example doesn't provide a good example to handle different kind of > exceptions. It's just a happy path example > # No clear comment to instruct users why we should do this, and what it is > doing for each operation. > > Furthermore, while running both the examples, I saw flood of logs output > because we print one line for message sent, and one line for message > received. In java-producer-consumer-demo, there will be 1 records > sent/received, so > 2 lines of logs output. Same for exactly-once-demo. > Maybe we should consider to reduce the record number. > > One more thing, in exactly-once-demo.java, there are clear class java doc in > the demo file, but there's nothing in java-producer-consumer-demo.java. We > should also improve that. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14752) improve kafka examples under examples package
[ https://issues.apache.org/jira/browse/KAFKA-14752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-14752. Resolution: Fixed > improve kafka examples under examples package > - > > Key: KAFKA-14752 > URL: https://issues.apache.org/jira/browse/KAFKA-14752 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Federico Valeri >Priority: Major > Labels: newbie, newbie++ > > Kafka provided some examples under "examples" package. Currently we provided > * java-producer-consumer-demo, which is to produce 1 records and then > consume all of them > * exactly-once-demo, which is to produce records -> consume -> process -> > consume. > Among them, the base component is producer and consumer. However, I found the > producer and consumer example is not in a good form. For example: > # Both consumer and producer doesn't gracefully close the resource after > completed > # The example doesn't provide a good example to handle different kind of > exceptions. It's just a happy path example > # No clear comment to instruct users why we should do this, and what it is > doing for each operation. > > Furthermore, while running both the examples, I saw flood of logs output > because we print one line for message sent, and one line for message > received. In java-producer-consumer-demo, there will be 1 records > sent/received, so > 2 lines of logs output. Same for exactly-once-demo. > Maybe we should consider to reduce the record number. > > One more thing, in exactly-once-demo.java, there are clear class java doc in > the demo file, but there's nothing in java-producer-consumer-demo.java. We > should also improve that. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14761) Integration Tests for the New Consumer Implementation
Philip Nee created KAFKA-14761: -- Summary: Integration Tests for the New Consumer Implementation Key: KAFKA-14761 URL: https://issues.apache.org/jira/browse/KAFKA-14761 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Assignee: Philip Nee This Jira tracks the efforts of integratoin testing for the new consumer we are implementing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14468) Refactor Commit Logic
Philip Nee created KAFKA-14468: -- Summary: Refactor Commit Logic Key: KAFKA-14468 URL: https://issues.apache.org/jira/browse/KAFKA-14468 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Philip Nee Assignee: Philip Nee Refactor commit logic using the new multi-threaded coordinator construct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14438) Stop supporting empty consumer groupId
Philip Nee created KAFKA-14438: -- Summary: Stop supporting empty consumer groupId Key: KAFKA-14438 URL: https://issues.apache.org/jira/browse/KAFKA-14438 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Fix For: 4.0.0 Currently, a warning message is logged upon using an empty consumer groupId. In the next major release, we should drop the support of empty ("") consumer groupId. cc [~hachikuji] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14252) Create background thread skeleton
[ https://issues.apache.org/jira/browse/KAFKA-14252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-14252. Resolution: Fixed > Create background thread skeleton > - > > Key: KAFKA-14252 > URL: https://issues.apache.org/jira/browse/KAFKA-14252 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > > The event handler internally instantiates a background thread to consume > ApplicationEvents and produce BackgroundEvents. In this ticket, we will > create a skeleton of the background thread. We will incrementally add > implementation in the future. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14274) Implement fetching logic
Philip Nee created KAFKA-14274: -- Summary: Implement fetching logic Key: KAFKA-14274 URL: https://issues.apache.org/jira/browse/KAFKA-14274 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Philip Nee The fetch request and fetch processing should happen asynchronously. More specifically, we have the background thread to send fetch requests autonomously and relay the response back to the polling thread. The polling thread collects these fetch requests and returns the ConsumerRecord. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14264) Create a coordinator class for the new implementation
Philip Nee created KAFKA-14264: -- Summary: Create a coordinator class for the new implementation Key: KAFKA-14264 URL: https://issues.apache.org/jira/browse/KAFKA-14264 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Philip Nee Assignee: Philip Nee To refactor the consumer, we changed how the coordinator is called. However, there will be a time period where the old and new implementation need to coexist, so we will need to override some of the methods and create a new implementation of the coordinator. In particular: # ensureCoordinatorReady needs to be non-blocking or we could just use the sendFindCoordinatorRequest. # joinGroupIfNeeded needs to be broken up into more find grain stages for the new implementation to work. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14252) Create background thread skeleton
Philip Nee created KAFKA-14252: -- Summary: Create background thread skeleton Key: KAFKA-14252 URL: https://issues.apache.org/jira/browse/KAFKA-14252 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Philip Nee Assignee: Philip Nee The event handler internally instantiates a background thread to consume ApplicationEvents and produce BackgroundEvents. In this ticket, we will create a skeleton of the background thread. We will incrementally add implementation in the future. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14247) Implement EventHandler interface and DefaultEventHandler
Philip Nee created KAFKA-14247: -- Summary: Implement EventHandler interface and DefaultEventHandler Key: KAFKA-14247 URL: https://issues.apache.org/jira/browse/KAFKA-14247 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Philip Nee Assignee: Philip Nee The polling thread uses events to communicate with the background thread. The events send to the background thread are the {_}Requests{_}, and the events send from the background thread to the polling thread are the {_}Responses{_}. Here we have an EventHandler interface and DefaultEventHandler implementation. The implementation uses two blocking queues to send events both ways. The two methods, add and poll allows the client, i.e., the polling thread, to retrieve and add events to the handler. PR: https://github.com/apache/kafka/pull/12663 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14246) Refactor KafkaConsumer Threading Model
Philip Nee created KAFKA-14246: -- Summary: Refactor KafkaConsumer Threading Model Key: KAFKA-14246 URL: https://issues.apache.org/jira/browse/KAFKA-14246 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Philip Nee Assignee: Philip Nee Hi community, We are refactoring the current KafkaConsumer and making it more asynchronous. This is the proposal's placeholder; subtasks will be linked to this ticket. Please review the design document and feel free to use this thread for discussion. The design document is here: [https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor] The original email thread is here: https://lists.apache.org/thread/13jvwzkzmb8c6t7drs4oj2kgkjzcn52l I will continue to update the 1pager as reviews and comments come. Thanks, P -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14196) Flaky OffsetValidationTest seems to indicate potential duplication issue during rebalance
Philip Nee created KAFKA-14196: -- Summary: Flaky OffsetValidationTest seems to indicate potential duplication issue during rebalance Key: KAFKA-14196 URL: https://issues.apache.org/jira/browse/KAFKA-14196 Project: Kafka Issue Type: Bug Components: clients, consumer Affects Versions: 3.2.1 Reporter: Philip Nee Several flaky tests under OffsetValidationTest are indicating potential consumer duplication issue, when autocommit is enabled. Below shows the failure message: {code:java} Total consumed records 3366 did not match consumed position 3331 {code} After investigating the log, I discovered that the data consumed between the start of a rebalance event and the async commit was lost for those failing tests. In the example below, the rebalance event kicks in at around 1662054846995 (first record), and the async commit of the offset 3739 is completed at around 1662054847015 (right before partitions_revoked). {code:java} {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]} {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]} {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]} {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]} {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]} {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]} {code} A few things to note here: # This is highly flaky, I found 1/4 runs will fail the tests # Manually calling commitSync in the onPartitionsRevoke cb seems to alleviate the issue # Setting includeMetadataInTimeout to false also seems to alleviate the issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)