[jira] [Resolved] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop

2024-05-22 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16160.

Resolution: Cannot Reproduce

> AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
> --
>
> Key: KAFKA-16160
> URL: https://issues.apache.org/jira/browse/KAFKA-16160
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Observing some excessive logging running AsyncKafkaConsumer and observing 
> excessive logging of :
> {code:java}
> 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, 
> groupId=concurrent_consumer] Node is not ready, handle the request in the 
> next event loop: node=worker4:9092 (id: 2147483644 rack: null), 
> request=UnsentRequest{requestBuil     
> der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', 
> memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, 
> rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], 
> serverAssignor=null, topicP     
> artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), 
> handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b,
>  node=Optional[worker4:9092 (id: 2147483644 rack: null)]     , 
> timer=org.apache.kafka.common.utils.Timer@55ed4733} 
> (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code}
> This seems to be triggered by a tight poll loop of the network thread.  The 
> right thing to do is to backoff a bit for that given node and retry later.
> This should be a blocker for 3.8 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16819) CoordinatorRequestManager seems to return 0ms during the coordinator discovery

2024-05-22 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16819:
--

 Summary: CoordinatorRequestManager seems to return 0ms during the 
coordinator discovery
 Key: KAFKA-16819
 URL: https://issues.apache.org/jira/browse/KAFKA-16819
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


In KAFKA-15250 we discovered the ConsumerNetworkThread is looping without much 
backoff.  The in-flight check PR fixed a lot of it; however, during the 
coordinator discovery phase, CoordinatorRequestManager would keep on returning 
0 before the coordinator node was found.

 

The impact is minor but we should be expecting the coordinator manager to 
backoff until the exp backoff expired (so it should return around 100ms).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16799) NetworkClientDelegate is not backing off if the node is not found

2024-05-20 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16799:
--

 Summary: NetworkClientDelegate is not backing off if the node is 
not found
 Key: KAFKA-16799
 URL: https://issues.apache.org/jira/browse/KAFKA-16799
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


When performing stress testing, I found that AsycnKafkaConsumer's network 
client delegate isn't backing off if the node is not ready, causing a large 
number of: 
{code:java}
 358 [2024-05-20 22:59:02,591] DEBUG [Consumer 
clientId=consumer.7136899e-0c20-4ccb-8ba3-497e9e683594-0, 
groupId=consumer-groups-test-5] Node is not ready, handle the request in the 
next event loop: node=b4-pkc-devcmkz697.us-west-2.aws.devel.cpd     
ev.cloud:9092 (id: 2147483643 rack: null), 
request=UnsentRequest{requestBuilder=ConsumerGroupHeartbeatRequestData(groupId='consumer-groups-test-5',
 memberId='', memberEpoch=0, instanceId=null, rackId=null, 
rebalanceTimeoutMs=10, subscri     
bedTopicNames=[_kengine-565-test-topic8081], serverAssignor=null, 
topicPartitions=[]), 
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@139a8761,
 node=Optional[b4-pkc-devcmkz697.us-west-2.aws     .devel.cpdev.cloud:9092 (id: 
2147483643 rack: null)], timer=org.apache.kafka.common.utils.Timer@649fffad} 
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate:169) {code}
show up in the log.

What should have happened is: 1. node is not ready 2. exponential back off 3. 
retry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16778) AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked partition

2024-05-15 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16778:
--

 Summary: AsyncKafkaConsumer fetcher might occasionally try to 
fetch to a revoked partition
 Key: KAFKA-16778
 URL: https://issues.apache.org/jira/browse/KAFKA-16778
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee


 
{code:java}
java.lang.IllegalStateException: No current assignment for partition 
output-topic-26
    at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369)
    at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.position(SubscriptionState.java:542)
    at 
org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:411)
    at 
org.apache.kafka.clients.consumer.internals.FetchRequestManager.poll(FetchRequestManager.java:74)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$new$2(ConsumerNetworkThread.java:159)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:143)
    at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
    at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
    at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at 
java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:145)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:94)
 {code}
The setup is - running 30 consumers consuming from a 300 partitions topic.

We can occasionally get an IllegalStateException from the consumer. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-05-03 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16474.

Resolution: Fixed

Ran the test several times, the client log also looks fine.

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: failing_results.zip
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer

2024-04-19 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16579.

Resolution: Fixed

> Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer
> -
>
> Key: KAFKA-16579
> URL: https://issues.apache.org/jira/browse/KAFKA-16579
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated 
> a slew of system tests to run both the "old" and "new" implementations. 
> KAFKA-16271 updated the system tests in {{consumer_rolling_upgrade_test.py}} 
> so it could test the new consumer. However, the test is tailored specifically 
> to the "old" Consumer's protocol and assignment strategy upgrade.
> Unsurprisingly, when we run those system tests with the new 
> {{AsyncKafkaConsumer}}, we get errors like the following:
> {code}
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   29.634 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1)})}")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 77, in rolling_update_test
> self._verify_range_assignment(consumer)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 41, in _verify_range_assignment
> "Mismatched assignment: %s" % assignment
> AssertionError: Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1)})}
> {code}
> The task here is to revert the changes made in KAFKA-16271.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-04-18 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16389.

Resolution: Fixed

> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch, consumer.log
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
> num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
> {code}
> To reproduce, create a system test suite file named 
> {{test_valid_assignment.yml}} with these contents:
> {code:yaml}
> failures:
>   - 
> 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
> {code}
> Then set the the {{TC_PATHS}} environment variable to include that test suite 
> file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16474) AsyncKafkaConsumer might rapidly send out successive heartbeat causing partitions getting revoked

2024-04-04 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16474:
--

 Summary: AsyncKafkaConsumer might rapidly send out successive 
heartbeat causing partitions getting revoked
 Key: KAFKA-16474
 URL: https://issues.apache.org/jira/browse/KAFKA-16474
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


KAFKA-16389

We've discovered that in some uncommon cases, the consumer could send out 
successive heartbeats without waiting for the response to come back.  this 
might result in causing the consumer to revoke its just assigned assignments in 
some cases.  For example:

 

The consumer first sends out a heartbeat with epoch=0 and memberId='' 

The consumer then rapidly sends out another heartbeat with epoch=0 and 
memberId='' because it has not gotten any response and thus not updating its 
local state

 

The consumer receives assignments from the first heartbeat and reconciles its 
assignment.

 

Since the second heartbeat has epoch=0 and memberId='', the server will think 
this is a new member joining and therefore send out an empty assignment.  

 

The consumer receives the response from the second heartbeat.  Revoke all of 
its partitions.

 

There are 2 issues associate with this bug:
 # inflight logic
 # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to be 
a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout

2024-03-27 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16433:
--

 Summary: beginningOffsets and offsetsForTimes don't behave 
consistently when providing a zero timeout
 Key: KAFKA-16433
 URL: https://issues.apache.org/jira/browse/KAFKA-16433
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


As documented here:[https://github.com/apache/kafka/pull/15525]

 

Both API should at least send out a request when zero timeout is provided.

 

This is corrected in the PR above.  We however still to fix the implementation 
for offsetsForTimes API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16405) Mismatch assignment error when running consumer rolling upgrade system tests

2024-03-22 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16405:
--

 Summary: Mismatch assignment error when running consumer rolling 
upgrade system tests
 Key: KAFKA-16405
 URL: https://issues.apache.org/jira/browse/KAFKA-16405
 Project: Kafka
  Issue Type: Task
  Components: consumer, system tests
Reporter: Philip Nee


relevant to [https://github.com/apache/kafka/pull/15578]

 

We are seeing:
{code:java}

SESSION REPORT (ALL TESTS)
ducktape version: 0.11.4
session_id:   2024-03-21--001
run time: 3 minutes 24.632 seconds
tests run:7
passed:   5
flaky:0
failed:   2
ignored:  0

test_id:
kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=classic
status: PASS
run time:   24.599 seconds

test_id:
kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   26.638 seconds


AssertionError("Mismatched assignment: {frozenset(), 
frozenset({TopicPartition(topic='test_topic', partition=3), 
TopicPartition(topic='test_topic', partition=0), 
TopicPartition(topic='test_topic', partition=1), 
TopicPartition(topic='test_topic', partition=2)})}")
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", 
line 77, in rolling_update_test
self._verify_range_assignment(consumer)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", 
line 38, in _verify_range_assignment
assert assignment == set([
AssertionError: Mismatched assignment: {frozenset(), 
frozenset({TopicPartition(topic='test_topic', partition=3), 
TopicPartition(topic='test_topic', partition=0), 
TopicPartition(topic='test_topic', partition=1), 
TopicPartition(topic='test_topic', partition=2)})}


test_id:
kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
status: PASS
run time:   29.815 seconds

test_id:
kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True
status: PASS
run time:   29.766 seconds

test_id:
kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic
status: PASS
run time:   30.086 seconds

test_id:
kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   35.965 seconds


AssertionError("Mismatched assignment: {frozenset(), 
frozenset({TopicPartition(topic='test_topic', partition=3), 
TopicPartition(topic='test_topic', partition=0), 
TopicPartition(topic='test_topic', partition=1), 
TopicPartition(topic='test_topic', partition=2)})}")
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", 
line 77, in rolling_update_test
self._verify_range_assignment(consumer)
  File 

[jira] [Created] (KAFKA-16390) consumer_bench_test.py failed using AsyncKafkaConsumer

2024-03-19 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16390:
--

 Summary: consumer_bench_test.py failed using AsyncKafkaConsumer
 Key: KAFKA-16390
 URL: https://issues.apache.org/jira/browse/KAFKA-16390
 Project: Kafka
  Issue Type: Task
  Components: consumer, system tests
Reporter: Philip Nee


Ran the system test based on KAFKA-16273

The following tests failed using the consumer group protocol
{code:java}
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer

kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer

kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
 {code}
Because of
{code:java}
 TimeoutError('consume_workload failed to finish in the expected amount of 
time.')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
    data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
    return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", line 
146, in test_single_partition
    consume_workload.wait_for_done(timeout_sec=180)
  File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line 352, 
in wait_for_done
    wait_until(lambda: self.done(),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
    raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: consume_workload failed to finish in the expected 
amount of time. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics

2024-02-28 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16116.

Resolution: Fixed

> AsyncKafkaConsumer: Add missing rebalance metrics
> -
>
> Key: KAFKA-16116
> URL: https://issues.apache.org/jira/browse/KAFKA-16116
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: consumer-threading-refactor, metrics
> Fix For: 3.8.0
>
>
> The following metrics are missing:
> |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]|
> |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]|
> |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]|
> |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]|
> |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]|
> |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]|
> |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics

2024-02-02 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16115.

Resolution: Fixed

> AsyncKafkaConsumer: Add missing heartbeat metrics
> -
>
> Key: KAFKA-16115
> URL: https://issues.apache.org/jira/browse/KAFKA-16115
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: consumer-threading-refactor, metrics
> Fix For: 3.8.0
>
>
> The following metrics are missing:
> |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]|
> |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]|
> |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]|
> |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]|
> |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics

2024-01-19 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16113.

Resolution: Fixed

> AsyncKafkaConsumer: Add missing offset commit metrics
> -
>
> Key: KAFKA-16113
> URL: https://issues.apache.org/jira/browse/KAFKA-16113
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, metrics
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> The following metrics are missing from the AsyncKafkaConsumer:
> commit-latency-avg
> commit-latency-max
> commit-rate
> commit-total
> committed-time-ns-total



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected note in a tight loop

2024-01-17 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16160:
--

 Summary: AsyncKafkaConsumer is trying to connect to a disconnected 
note in a tight loop
 Key: KAFKA-16160
 URL: https://issues.apache.org/jira/browse/KAFKA-16160
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee


Observing some excessive logging running AsyncKafkaConsumer and observing 
excessive logging of :
{code:java}
1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, 
groupId=concurrent_consumer] Node is not ready, handle the request in the next 
event loop: node=worker4:9092 (id: 2147483644 rack: null), 
request=UnsentRequest{requestBuil     
der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', 
memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, rackId=null, 
rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], 
serverAssignor=null, topicP     
artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), 
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b,
 node=Optional[worker4:9092 (id: 2147483644 rack: null)]     , 
timer=org.apache.kafka.common.utils.Timer@55ed4733} 
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code}
This seems to be triggered by a tight poll loop of the network thread.  The 
right thing to do is to backoff a bit for that given node and retry later.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16159) Prune excessive logging from Telemetry Reporter

2024-01-17 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16159:
--

 Summary: Prune excessive logging from Telemetry Reporter
 Key: KAFKA-16159
 URL: https://issues.apache.org/jira/browse/KAFKA-16159
 Project: Kafka
  Issue Type: Task
  Components: consumer, log
Reporter: Philip Nee
Assignee: Apoorv Mittal


While running system tests locally, I've noticed excessive logging of the 
Telemtry Reporter.  This I believe was introduced in KIP-714.
{code:java}
[2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, 
returning the value 224678 ms; the client will wait before submitting the next 
GetTelemetrySubscriptions network API request 
(org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code}
This is logged several times per ms - Also, given the amount of log being 
emitted, can we also check the CPU profile to see if there's a process running 
a tight loop?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16112) Review JMX metrics in Async Consumer and determine the missing ones

2024-01-12 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16112.

Resolution: Fixed

These are the results of this ticket

 
|KAFKA-16113|
|KAFKA-16116|
|KAFKA-16115|

> Review JMX metrics in Async Consumer and determine the missing ones
> ---
>
> Key: KAFKA-16112
> URL: https://issues.apache.org/jira/browse/KAFKA-16112
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics

2024-01-11 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16116:
--

 Summary: AsyncKafkaConsumer: Add missing rebalance metrics
 Key: KAFKA-16116
 URL: https://issues.apache.org/jira/browse/KAFKA-16116
 Project: Kafka
  Issue Type: Improvement
Reporter: Philip Nee
Assignee: Philip Nee


The following metrics are missing:
|[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]|
|[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]|
|[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]|
|[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]|
|[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]|
|[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]|
|[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics

2024-01-11 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16115:
--

 Summary: AsyncKafkaConsumer: Add missing heartbeat metrics
 Key: KAFKA-16115
 URL: https://issues.apache.org/jira/browse/KAFKA-16115
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, metrics
Reporter: Philip Nee
Assignee: Philip Nee


The following metrics are missing:
|[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]|
|[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]|
|[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]|
|[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]|
|[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics

2024-01-10 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16113:
--

 Summary: AsyncKafkaConsumer: Add missing offset commit metrics
 Key: KAFKA-16113
 URL: https://issues.apache.org/jira/browse/KAFKA-16113
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


The following metrics are missing from the AsyncKafkaConsumer:

commit-latency-avg
commit-latency-max
commit-rate
commit-total
committed-time-ns-total



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15250) DefaultBackgroundThread is running tight loop

2024-01-08 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15250.

Fix Version/s: (was: 3.8.0)
   Resolution: Fixed

> DefaultBackgroundThread is running tight loop
> -
>
> Key: KAFKA-15250
> URL: https://issues.apache.org/jira/browse/KAFKA-15250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
> think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15948) Refactor AsyncKafkaConsumer shutdown

2023-12-27 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15948.

  Assignee: Philip Nee  (was: Phuc Hong Tran)
Resolution: Fixed

> Refactor AsyncKafkaConsumer shutdown
> 
>
> Key: KAFKA-15948
> URL: https://issues.apache.org/jira/browse/KAFKA-15948
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Upon closing we need a round trip from the network thread to the application 
> thread and then back to the network thread to complete the callback 
> invocation.  Currently, we don't have any of that.  I think we need to 
> refactor our closing mechanism.  There are a few points to the refactor:
>  # The network thread should know if there's a custom user callback to 
> trigger or not.  If there is, it should wait for the callback completion to 
> send a leave group.  If not, it should proceed with the shutdown.
>  # The application thread sends a closing signal to the network thread and 
> continuously polls the background event handler until time runs out.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id

2023-12-19 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16034:
--

 Summary: AsyncKafkaConsumer will get Invalid Request error when 
trying to rejoin on fenced/unknown member Id
 Key: KAFKA-16034
 URL: https://issues.apache.org/jira/browse/KAFKA-16034
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee


The consumer will log invalid request error when joining from fenced/unknown 
member id because we didn't reset the HeartbeatState and we won't send the 
needed fields (rebalanceTimeoutMs for example) when joining.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership

2023-12-18 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16027:
--

 Summary: Refactor MetadataTest#testUpdatePartitionLeadership
 Key: KAFKA-16027
 URL: https://issues.apache.org/jira/browse/KAFKA-16027
 Project: Kafka
  Issue Type: Improvement
Reporter: Philip Nee


MetadataTest#testUpdatePartitionLeadership is extremely long.  I think it is 
pretty close to the 160 line method limit.  The test also contains two tests, 
so it is best to split it into two separate tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16026) AsyncConsumer does not send a poll event to the background thread

2023-12-18 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16026:
--

 Summary: AsyncConsumer does not send a poll event to the 
background thread
 Key: KAFKA-16026
 URL: https://issues.apache.org/jira/browse/KAFKA-16026
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee


consumer poll does not send a poll event to the background thread to:
 # trigger autocommit
 # reset max poll interval timer

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16024) SaslPlaintextConsumerTest#testCoordinatorFailover is flaky

2023-12-17 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16024:
--

 Summary: SaslPlaintextConsumerTest#testCoordinatorFailover is flaky
 Key: KAFKA-16024
 URL: https://issues.apache.org/jira/browse/KAFKA-16024
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee


The test is flaky with the async consumer as we are observing

 
{code:java}
org.opentest4j.AssertionFailedError: Failed to observe commit callback before 
timeout{code}
I was not able to replicate this on my local machine easily.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16023) PlaintextConsumerTest needs to wait for reconciliation to complete before proceeding

2023-12-15 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16023:
--

 Summary: PlaintextConsumerTest needs to wait for reconciliation to 
complete before proceeding
 Key: KAFKA-16023
 URL: https://issues.apache.org/jira/browse/KAFKA-16023
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee


Several tests in PlaintextConsumerTest.scala (such as 
testPerPartitionLagMetricsCleanUpWithSubscribe) uses:

assertEquals(1, listener.callsToAssigned, "should be assigned once")

However, as the timing for reconciliation completion is not deterministic due 
to asynchronous processing. We actually need to wait until the condition to 
happen.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16022) AsyncKafkaConsumer sometimes complains "No current assignment for partition {}"

2023-12-15 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16022:
--

 Summary: AsyncKafkaConsumer sometimes complains "No current 
assignment for partition {}"
 Key: KAFKA-16022
 URL: https://issues.apache.org/jira/browse/KAFKA-16022
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee


This seems to be a timing issue that before the member receives any assignment 
from the coordinator, the fetcher will try to find the current position causing 
"No current assignment for partition {}".  This creates a small amount of noise 
to the log.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16019) Some of the tests in PlaintextConsumer can't seem to deterministically invokes and verify the consumer callback

2023-12-15 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16019:
--

 Summary: Some of the tests in PlaintextConsumer can't seem to 
deterministically invokes and verify the consumer callback
 Key: KAFKA-16019
 URL: https://issues.apache.org/jira/browse/KAFKA-16019
 Project: Kafka
  Issue Type: Task
Reporter: Philip Nee


I was running the PlaintextConsumer to test the async consumer; however, a few 
tests were failing with not being able to verify the listener is invoked 
correctly

For example `testPerPartitionLeadMetricsCleanUpWithSubscribe`

Around 50% of the time, the listener's callsToAssigned was never incremented 
correctly.  Event changing it to awaitUntilTrue it was still the same case
consumer.subscribe(List(topic, topic2).asJava, listener)
val records = awaitNonEmptyRecords(consumer, tp)
assertEquals(1, listener.callsToAssigned, "should be assigned once")



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15993) Enable integration tests that relies on rebalance listener

2023-12-11 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15993:
--

 Summary: Enable integration tests that relies on rebalance listener
 Key: KAFKA-15993
 URL: https://issues.apache.org/jira/browse/KAFKA-15993
 Project: Kafka
  Issue Type: Task
Reporter: Philip Nee


We will enable integration tests using the async consumer in KAFKA-15971.  
However, we should also enable tests that rely on rebalance listeners after 
KAFKA-15628 is closed.  One example would be testMaxPollIntervalMs, that I 
relies on the listener to verify the correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15948) Refactor AsyncKafkaConsumer shutdown

2023-11-29 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15948:
--

 Summary: Refactor AsyncKafkaConsumer shutdown
 Key: KAFKA-15948
 URL: https://issues.apache.org/jira/browse/KAFKA-15948
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee


Upon closing we need a round trip from the network thread to the application 
thread and then back to the network thread to complete the callback invocation. 
 Currently, we don't have any of that.  I think we need to refactor our closing 
mechanism.  There are a few points to the refactor:
 # The network thread should know if there's a custom user callback to trigger 
or not.  If there is, it should wait for the callback completion to send a 
leave group.  If not, it should proceed with the shutdown.
 # The application thread sends a closing signal to the network thread and 
continuously polls the background event handler until time runs out.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15946) AsyncKafkaConsumer should retry commits on the application thread instead of autoretry

2023-11-29 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15946:
--

 Summary: AsyncKafkaConsumer should retry commits on the 
application thread instead of autoretry
 Key: KAFKA-15946
 URL: https://issues.apache.org/jira/browse/KAFKA-15946
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee


The original design was that the network thread always completes the future 
whether succeeds or fails.  However, in the current patch, I mis-added 
auto-retry functionality because commitSync wasn't retrying.  What we should be 
doing is, the commit sync API should catch the RetriableExceptions and resend 
another commit until timesout.

```

if (error.exception() instanceof RetriableException) {
log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, 
error.message());
handleRetriableError(error, response);
retry(responseTime);  <--- We probably shouldn't do this.
return;
}

```

 

and 

 

```

@Override
public void commitSync(Map offsets, Duration 
timeout) {
acquireAndEnsureOpen();
long commitStart = time.nanoseconds();
try {
CompletableFuture commitFuture = commit(offsets, true); <-- we probably 
should retry here
ConsumerUtils.getResult(commitFuture, time.timer(timeout));
} finally {
wakeupTrigger.clearTask();
kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart);
release();
}
}

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15942) Implement ConsumerInterceptor

2023-11-29 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15942:
--

 Summary: Implement ConsumerInterceptor
 Key: KAFKA-15942
 URL: https://issues.apache.org/jira/browse/KAFKA-15942
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Philip Nee


As title, we need to implement ConsumerInterceptor in the AsyncKafkaConsumer

 

This is the current code. The implementation would be very similar
{code:java}
if (interceptors != null)
interceptors.onCommit(offsets); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15913) Remove excessive use of spy in CosumerTestBuilder

2023-11-28 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15913:
--

 Summary: Remove excessive use of spy in CosumerTestBuilder
 Key: KAFKA-15913
 URL: https://issues.apache.org/jira/browse/KAFKA-15913
 Project: Kafka
  Issue Type: Improvement
Reporter: Philip Nee


ConsumerTestBuilder is meant to be an unit testing utility; however, we seem to 
use Mockito#spy quite liberally.  This is not the right testing strategy 
because we basically turn unit testing into integration testing.

 

While the current unit tests run fine, we should probably make the mocking 
using Mockito#mock by default and test each dependency independently.

 

The ask here is
 # Make mock(class) by default
 # Provide more flexible interface for the testBuilder to allow user to 
configure spy or mock.  Or, let user pass in their own mock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15872) Investigate autocommit retry logic

2023-11-21 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15872:
--

 Summary: Investigate autocommit retry logic
 Key: KAFKA-15872
 URL: https://issues.apache.org/jira/browse/KAFKA-15872
 Project: Kafka
  Issue Type: Sub-task
Reporter: Philip Nee


This is purely an investigation ticket.

Currently, we send an autocommit only if there isn't an inflight one; however, 
this logic might not be correct because I think we should:
 # expires the request if it is not completed in time
 # always send an autocommit on the clock



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15836) KafkaConsumer subscribes to multiple topics does not respect max.poll.records

2023-11-21 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15836.

Resolution: Fixed

PR merged.

> KafkaConsumer subscribes to multiple topics does not respect max.poll.records
> -
>
> Key: KAFKA-15836
> URL: https://issues.apache.org/jira/browse/KAFKA-15836
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Philip Nee
>Assignee: Andrew Schofield
>Priority: Blocker
>  Labels: consumer
> Fix For: 3.7.0
>
>
> We discovered that when KafkaConsumer subscribes to multiple topics with 
> max.poll.record configured.  The max.poll.record is not properly respected 
> for all poll() invocation.
>  
> I was able to reproduce it with the AK example, here is how I ran my tests:
> [https://github.com/apache/kafka/pull/14772]
>  
> 1. start zookeeper and kafka server (or kraft mode should be fine too)
> 2. Run: examples/bin/java-producer-consumer-demo.sh 1000
> 3. Polled records > 400 will be printed to stdout
>  
> Here is what the program does:
> The produce produces a large number of records to multiple topics.  We 
> configure the consumer using a max.poll.record = 400, and subscribed to 
> multiple topics.  The consumer poll, and the returned records can sometimes 
> be larger than 400.
>  
> This is an issue in AK 3.6 but 3.5 was fine.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15867) Should ConsumerNetworkThread wrap the exception and notify the polling thread?

2023-11-20 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15867:
--

 Summary: Should ConsumerNetworkThread wrap the exception and 
notify the polling thread?
 Key: KAFKA-15867
 URL: https://issues.apache.org/jira/browse/KAFKA-15867
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee


The ConsumerNetworkThread runs a tight loop infinitely.  However, when 
encountering an unexpected exception, it logs an error and continues.

 

I think this might not be ideal because user can run blind for a long time 
before discovering there's something wrong with the code; so I believe we 
should propagate the throwable back to the polling thread. 

 

cc [~lucasbru] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15866) Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState

2023-11-20 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15866:
--

 Summary: Refactor OffsetFetchRequestState Error handling to be 
more consistent with OffsetCommitRequestState
 Key: KAFKA-15866
 URL: https://issues.apache.org/jira/browse/KAFKA-15866
 Project: Kafka
  Issue Type: Improvement
Reporter: Philip Nee


The current OffsetFetchRequestState error handling uses nested if-else, which 
is quite different, stylistically, to the OffsetCommitRequestState using a 
switch statment.  The latter is a bit more readable so we should refactor the 
error handling using the same style to improve readability.

 

A minor point: Some of the error handling seems inconsistent with the commit. 
The logic was from the current implementation, so we should also review all the 
error handling.  For example: somehow the current logic doesn't mark the 
coordinator unavailable when receiving COORDINATOR_NOT_AVAILABLE



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback

2023-11-20 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15865:
--

 Summary: Ensure consumer.poll() execute autocommit callback
 Key: KAFKA-15865
 URL: https://issues.apache.org/jira/browse/KAFKA-15865
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


When the network thread completes autocommits, we need to send a message/event 
to the application to notify the thread to execute the callback.  In 
KAFKA-15327, the network thread sends a AutoCommitCompletionBackgroundEvent to 
the polling thread.  The polling thread should trigger the OffsetCommitCallback 
upon receiving it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15174) Ensure the correct thread is executing the callbacks

2023-11-20 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15174.

Resolution: Fixed

> Ensure the correct thread is executing the callbacks
> 
>
> Key: KAFKA-15174
> URL: https://issues.apache.org/jira/browse/KAFKA-15174
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> We need to add assertion tests to ensure the correct thread is executing the 
> offset commit callbacks and rebalance callback



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15836) KafkaConsumer subscribe to multiple topics does not respect max.poll.records

2023-11-15 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15836:
--

 Summary: KafkaConsumer subscribe to multiple topics does not 
respect max.poll.records
 Key: KAFKA-15836
 URL: https://issues.apache.org/jira/browse/KAFKA-15836
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.6.0
Reporter: Philip Nee
Assignee: Kirk True


We discovered that when KafkaConsumer subscribes to multiple topics with 
max.poll.record configured.  The max.poll.record is not properly respected for 
all poll() invocation.

 

I was able to reproduce it with the AK example, here is how I ran my tests:

[https://github.com/apache/kafka/pull/14772]

 

1. start zookeeper and kafka server (or kraft mode should be fine too)

2. Run: examples/bin/java-producer-consumer-demo.sh 1000

3. Polled records > 400 will be printed to stdout

 

Here is what the program does:

The produce produces a large number of records to multiple topics.  We 
configure the consumer using a max.poll.record = 400, and subscribed to 
multiple topics.  The consumer poll, and the returned records can sometimes be 
larger than 400.

 

This is an issue in AK 3.6 but 3.5 was fine.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15818) Implement max poll internval

2023-11-13 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15818:
--

 Summary: Implement max poll internval
 Key: KAFKA-15818
 URL: https://issues.apache.org/jira/browse/KAFKA-15818
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee


In the network thread, we need a timer configure to take MAX_POLL_INTERVAL_MAX. 
 The reason is if the user don't poll the consumer within the internal, the 
member needs to leave the group.

 

Currently, we send an acknowledgement event to the network thread per poll.  It 
needs to do two things 1. update autocommit state 2. update max poll interval 
timer 

 

The current logic looks like this:
{code:java}
 if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has 
stalled
// in between calls to poll().
log.warn("consumer poll timeout has expired. This means the time between 
subsequent calls to poll() " +
"was longer than the configured max.poll.interval.ms, which typically 
implies that " +
"the poll loop is spending too much time processing messages. You can 
address this " +
"either by increasing max.poll.interval.ms or by reducing the maximum 
size of batches " +
"returned in poll() with max.poll.records.");

maybeLeaveGroup("consumer poll timeout has expired.");
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15803) Update last seen epcoh during commit

2023-11-09 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15803:
--

 Summary: Update last seen epcoh during commit
 Key: KAFKA-15803
 URL: https://issues.apache.org/jira/browse/KAFKA-15803
 Project: Kafka
  Issue Type: Task
Reporter: Philip Nee
Assignee: Philip Nee


At the time we implemented commitAsync in the prototypeAsyncConsumer, metadata 
was not there. The ask here is to investigate if we need to add the following 
function to the commit code:

 

private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, 
OffsetAndMetadata offsetAndMetadata) {
if (offsetAndMetadata != null)
offsetAndMetadata.leaderEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15686) Consumer should be able to detect network problem

2023-11-06 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15686.

  Assignee: Philip Nee
Resolution: Won't Fix

The user can use the admin client to get the partition leader information. 

> Consumer should be able to detect network problem
> -
>
> Key: KAFKA-15686
> URL: https://issues.apache.org/jira/browse/KAFKA-15686
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: Jiahongchao
>Assignee: Philip Nee
>Priority: Minor
>
> When we call poll method in consumer, it will return normally even if some 
> partitions do not have a leader.
> What should we do to detect such failures? Currently we have to check log to 
> find out broker connection problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15775) Implement listTopics() and partitionFor() for the AsyncKafkaConsumer

2023-11-02 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15775:
--

 Summary: Implement listTopics() and partitionFor() for the 
AsyncKafkaConsumer
 Key: KAFKA-15775
 URL: https://issues.apache.org/jira/browse/KAFKA-15775
 Project: Kafka
  Issue Type: Task
Reporter: Philip Nee


{code:java}
@Override
public List partitionsFor(String topic) {
return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
}

@Override
public List partitionsFor(String topic, Duration timeout) {
throw new KafkaException("method not implemented");
}

@Override
public Map> listTopics() {
return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
}

@Override
public Map> listTopics(Duration timeout) {
throw new KafkaException("method not implemented");
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15679) Client support for new consumer configs

2023-11-02 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15679.

Resolution: Fixed

> Client support for new consumer configs
> ---
>
> Key: KAFKA-15679
> URL: https://issues.apache.org/jira/browse/KAFKA-15679
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support, kip-848-e2e, kip-848-preview
>
> New consumer should support the new configs introduced by KIP-848
> |group.protocol|enum|generic|A flag which indicates if the new protocol 
> should be used or not. It could be: generic or consumer|
> |group.remote.assignor|string|null|The server side assignor to use. It cannot 
> be used in conjunction with group.local.assignor. {{null}} means that the 
> choice of the assignor is left to the group coordinator.|
> The protocol introduces a 3rd property for client side (local) assignors, but 
> that will be introduced later on. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15531) Ensure coordinator node is removed upon disconnection exception

2023-11-02 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15531.

Resolution: Fixed

> Ensure coordinator node is removed upon disconnection exception
> ---
>
> Key: KAFKA-15531
> URL: https://issues.apache.org/jira/browse/KAFKA-15531
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> In the async consumer, the coordinator isn't being removed when receiving the 
> following exception:
>  
> {code:java}
> (e instanceof DisconnectException) {
>   markCoordinatorUnknown(true, e.getMessage());
> }{code}
>  
> This should happen on all requests going to coordinator node:
> 1. heartbeat 2. offset fetch/commit 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15316) CommitRequestManager not calling RequestState callbacks

2023-11-02 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15316.

Resolution: Fixed

> CommitRequestManager not calling RequestState callbacks 
> 
>
> Key: KAFKA-15316
> URL: https://issues.apache.org/jira/browse/KAFKA-15316
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
>
> CommitRequestManager is not triggering the RequestState callbacks that update 
> {_}lastReceivedMs{_}, affecting the _canSendRequest_ verification of the 
> RequestState



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15562) Ensure fetch offset and commit offset handler handles both timeout and various error types

2023-11-02 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15562.

Resolution: Fixed

> Ensure fetch offset and commit offset handler handles both timeout and 
> various error types
> --
>
> Key: KAFKA-15562
> URL: https://issues.apache.org/jira/browse/KAFKA-15562
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview
>
> Both fetchOffsetRequest and commitOffsetRequest handlers don't have 
> sufficient logic to handle timeout exception.
>  
> CommitOffsetRequest handler also doesn't handle various of server error such 
> as coordinator not found. We need to handle:
> If Exception is non null:
>  - handle RetriableError that respects requestTimeoutMs
>  - handle NonRetriableError
>  
> If the response contains error, ensure to:
>  - mark coordinator unknown if needed
>  - retry if needed
>  - fail the request



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15773) Group protocol configuration should be validated

2023-11-02 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15773:
--

 Summary: Group protocol configuration should be validated
 Key: KAFKA-15773
 URL: https://issues.apache.org/jira/browse/KAFKA-15773
 Project: Kafka
  Issue Type: Improvement
Reporter: Philip Nee
Assignee: Philip Nee


If the user specifies using the generic group, or not specifying the 
group.protocol config at all, we should invalidate all group.remote.assignor

 

If group.local.assignor and group.remote.assignor are both configured, we 
should also invalidate the configuration

 

This is an optimization/user experience improvement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15631) Do not send new heartbeat request while another one in-flight

2023-10-30 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15631.

Resolution: Not A Problem

> Do not send new heartbeat request while another one in-flight
> -
>
> Key: KAFKA-15631
> URL: https://issues.apache.org/jira/browse/KAFKA-15631
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> Client consumer should not send a new heartbeat request while there is a 
> previous in-flight. If a HB is in-flight, we should wait for a response or 
> timeout before sending a next one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15550) OffsetsForTimes validation for negative timestamps in new consumer

2023-10-27 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15550.

Resolution: Fixed

> OffsetsForTimes validation for negative timestamps in new consumer
> --
>
> Key: KAFKA-15550
> URL: https://issues.apache.org/jira/browse/KAFKA-15550
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, kip-848-preview
>
> OffsetsForTimes api call should fail with _IllegalArgumentException_ if 
> negative timestamps are provided as arguments. This will effectively exclude 
> earliest and latest offsets as target times, keeping the current behaviour of 
> the KafkaConsumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15697) Add local assignor and ensure it cannot be used with server side assignor

2023-10-26 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15697:
--

 Summary: Add local assignor and ensure it cannot be used with 
server side assignor
 Key: KAFKA-15697
 URL: https://issues.apache.org/jira/browse/KAFKA-15697
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee


When we start supporting local/client-side assignor, we should:
 # Add the config to ConsumerConfig
 # Examine where should we implement to logic to ensure it is not used along 
side with the server side assignor, i.e. you can only specify local or remote 
assignor, or non.
 ## If both assignors are specified: Throw illegalArgumentException



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15642) Ensure offset fetcher behaves correctly when the request is Timeout

2023-10-18 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15642:
--

 Summary: Ensure offset fetcher behaves correctly when the request 
is Timeout
 Key: KAFKA-15642
 URL: https://issues.apache.org/jira/browse/KAFKA-15642
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee
Assignee: Philip Nee


I need to test the behavior of OffsetFetcher when the request is timeout - It 
seems like we should continue to retry until the top level timer times out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15592) Member does not need to always try to join a group when a groupId is configured

2023-10-11 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15592:
--

 Summary: Member does not need to always try to join a group when a 
groupId is configured
 Key: KAFKA-15592
 URL: https://issues.apache.org/jira/browse/KAFKA-15592
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


Currently, instantiating a membershipManager means the member will always seek 
to join a group unless it has failed fatally.  However, this is not always the 
case because the member should be able to join and leave a group any time 
during its life cycle. Maybe we should include an "inactive" state in the state 
machine indicating the member does not want to be in a rebalance group.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15588) Purge the unsent offset commits/fetches when the member is fenced

2023-10-11 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15588:
--

 Summary: Purge the unsent offset commits/fetches when the member 
is fenced
 Key: KAFKA-15588
 URL: https://issues.apache.org/jira/browse/KAFKA-15588
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee
Assignee: Philip Nee


When the member is fenced/failed, we should purge the inflight offset commits 
and fetches.  HeartbeatRequestManager should be able to handle this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC

2023-10-09 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15278.

Resolution: Fixed

> Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
> 
>
> Key: KAFKA-15278
> URL: https://issues.apache.org/jira/browse/KAFKA-15278
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support, kip-848-e2e, kip-848-preview
>
> The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
> and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. 
> It is assumed that the scaffolding for the other two will come along in time.
>  * Implement {{ConsumerGroupRequestManager}}
>  * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts 
> so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
> interval regardless of other {{RequestManager}} instance activity
>  * Ensure error is handled correctly
>  * Ensure MembershipStateManager is updated on both successful and failures 
> cases, and the state machine is transioned to the correct state.
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15562) CommitRequestManager needs to test different error handling

2023-10-06 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15562:
--

 Summary: CommitRequestManager needs to test different error 
handling
 Key: KAFKA-15562
 URL: https://issues.apache.org/jira/browse/KAFKA-15562
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


Review the code in Consumercoordinator#OffsetCommitResponseHandler 

Implement the error handling and add tests for these errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15553) Review committed offset refresh logic

2023-10-05 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15553:
--

 Summary: Review committed offset refresh logic
 Key: KAFKA-15553
 URL: https://issues.apache.org/jira/browse/KAFKA-15553
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


>From the exsiting comment: If there are any partitions which do not have a 
>valid position and are not awaiting reset, then we need to fetch committed 
>offsets.

 

In the async consumer: I wonder if it would make sense to refresh the position 
on the event loop continuously.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15551) Evaluate conditions for short circuiting consumer API calls

2023-10-05 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15551:
--

 Summary: Evaluate conditions for short circuiting consumer API 
calls
 Key: KAFKA-15551
 URL: https://issues.apache.org/jira/browse/KAFKA-15551
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


For conditions like:
 * Committing empty offset
 * Fetching offsets for empty partitions
 * Getting empty topic partition position

Should be short circuit possibly at the API level.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15548) Handling close() properly

2023-10-04 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15548:
--

 Summary: Handling close() properly
 Key: KAFKA-15548
 URL: https://issues.apache.org/jira/browse/KAFKA-15548
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


Upon closing we need to:
 # Complete pending commits
 # Auto-commit if needed
 # Send the last GroupConsumerHeartbeatRequest with epoch = -1 to leave the 
group
 # poll the NetworkClient to complete pending I/O



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15534) Propagate client response time when timeout to the request handler

2023-10-03 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15534:
--

 Summary: Propagate client response time when timeout to the 
request handler
 Key: KAFKA-15534
 URL: https://issues.apache.org/jira/browse/KAFKA-15534
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee
Assignee: Philip Nee


Currently, we don't have a good way to propagate the response time to the 
handler when timeout is thrown.
{code:java}
unsent.handler.onFailure(new TimeoutException(
"Failed to send request after " + unsent.timer.timeoutMs() + " ms.")); 
{code}
The current request manager invoke a system call to retrieve the response time, 
which is not idea because it is already available at network client

This is an example of the coordinator request manager:
{code:java}
unsentRequest.future().whenComplete((clientResponse, throwable) -> {
long responseTimeMs = time.milliseconds();
if (clientResponse != null) {
FindCoordinatorResponse response = (FindCoordinatorResponse) 
clientResponse.responseBody();
onResponse(responseTimeMs, response);
} else {
onFailedResponse(responseTimeMs, throwable);
}
}); {code}
But in the networkClientDelegate, we should utilize the currentTimeMs in the 
trySend to avoid calling time.milliseconds():
{code:java}
private void trySend(final long currentTimeMs) {
...
unsent.handler.onFailure(new TimeoutException(
"Failed to send request after " + unsent.timer.timeoutMs() + " ms."));
continue;
}
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15533) Ensure HeartbeatRequestManager only send out some fields once

2023-10-03 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15533:
--

 Summary: Ensure HeartbeatRequestManager only send out some fields 
once
 Key: KAFKA-15533
 URL: https://issues.apache.org/jira/browse/KAFKA-15533
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee
Assignee: Philip Nee


We want to ensure ConsumerGroupHeartbeatRequest is as lightweight as possible, 
so a lot of fields in it don't need to be resend. An example would be the 
rebalanceTimeoutMs, currently we have the following code:

 

 
{code:java}
ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData()
.setGroupId(membershipManager.groupId())
.setMemberEpoch(membershipManager.memberEpoch())
.setMemberId(membershipManager.memberId())
.setRebalanceTimeoutMs(rebalanceTimeoutMs); {code}
 

 

We should encapsulate these once-used fields into a class such as 
HeartbeatMetdataBuilder, and it should maintain a state of whether a certain 
field needs to be sent or not.

 

Note that, currently only 3 fields are mandatory in the request:
 * groupId
 * memberEpoch
 * memberId



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15531) Ensure coordinator node is removed upon disconnection exception

2023-10-03 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15531:
--

 Summary: Ensure coordinator node is removed upon disconnection 
exception
 Key: KAFKA-15531
 URL: https://issues.apache.org/jira/browse/KAFKA-15531
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


In the async consumer, the coordinator isn't being removed when receiving the 
following exception:

 
{code:java}
(e instanceof DisconnectException) {
  markCoordinatorUnknown(true, e.getMessage());
}{code}
 

This should happen on all requests going to coordinator node:

1. heartbeat 2. offset fetch/commit 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15475) Timeout request might retry forever even if the user API times out in AsyncConsumer

2023-09-18 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15475:
--

 Summary: Timeout request might retry forever even if the user API 
times out in AsyncConsumer
 Key: KAFKA-15475
 URL: https://issues.apache.org/jira/browse/KAFKA-15475
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


If the request timeout in the background thread, it will be completed with 
TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
possibly other managers, the request might continue to be retried forever.

 

There are two ways to fix this
 # Pass a timer to the manager to remove the inflight requests when it is 
expired.
 # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15474) AbstractCoordinator.testWakeupAfterSyncGroupReceivedExternalCompletion seems flaky

2023-09-18 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15474:
--

 Summary: 
AbstractCoordinator.testWakeupAfterSyncGroupReceivedExternalCompletion seems 
flaky
 Key: KAFKA-15474
 URL: https://issues.apache.org/jira/browse/KAFKA-15474
 Project: Kafka
  Issue Type: Test
Affects Versions: 3.6.0
Reporter: Philip Nee


Ran into test failures when running the full AbstractCoordinatorTest unit test 
suit.

It is not very easy to reproduce, I seem to need to run the full unit module to 
"sometimes" reproduce it.
{code:java}
Should have woken up from ensureActiveGroup()
org.opentest4j.AssertionFailedError: Should have woken up from 
ensureActiveGroup()
    at app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
    at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:135)
    at 
app//org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest.testWakeupAfterSyncGroupReceivedExternalCompletion(AbstractCoordinatorTest.java:1531)
    at 
java.base@15.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) {code}
testWakeupAfterSyncGroupReceived also appears flaky:
{code:java}
Should have woken up from ensureActiveGroup()
org.opentest4j.AssertionFailedError: Should have woken up from 
ensureActiveGroup()
    at app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
    at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:135)
    at 
app//org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest.testWakeupAfterSyncGroupReceived(AbstractCoordinatorTest.java:1498)
    at 
java.base@15.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
    at 
java.base@15.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15333) Flaky build failure throwing Connect Exception: Could not connect to server....

2023-08-10 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15333:
--

 Summary: Flaky build failure throwing Connect Exception: Could not 
connect to server
 Key: KAFKA-15333
 URL: https://issues.apache.org/jira/browse/KAFKA-15333
 Project: Kafka
  Issue Type: Test
  Components: connect, unit tests
Reporter: Philip Nee


We frequently observe flaky build failure with the following message.  The is 
from the most recent PR post 3.5.0:

 
{code:java}
> Task :generator:testClasses UP-TO-DATE
Unexpected exception thrown.
org.gradle.internal.remote.internal.MessageIOException: Could not read message 
from '/127.0.0.1:38354'.
at 
org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:94)
at 
org.gradle.internal.remote.internal.hub.MessageHub$ConnectionReceive.run(MessageHub.java:270)
at 
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
at 
org.gradle.internal.concurrent.AbstractManagedExecutor$1.run(AbstractManagedExecutor.java:47)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException
at 
org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:72)
at 
org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:52)
at 
org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:81)
... 6 more

> Task :streams:upgrade-system-tests-26:unitTest
org.gradle.internal.remote.internal.ConnectException: Could not connect to 
server [3156f144-9a89-4c47-91ad-88a8378ec726 port:37889, 
addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1].
at 
org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:67)
at 
org.gradle.internal.remote.internal.hub.MessageHubBackedClient.getConnection(MessageHubBackedClient.java:36)
at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:103)
at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:122)
at 
org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.tryConnect(TcpOutgoingConnector.java:81)
at 
org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:54)
... 5 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2023-08-04 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15305:
--

 Summary: The background thread should try to process the remaining 
task until the shutdown timer is expired
 Key: KAFKA-15305
 URL: https://issues.apache.org/jira/browse/KAFKA-15305
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


While working on https://issues.apache.org/jira/browse/KAFKA-15304

close() API supplies a timeout parameter so that the consumer can have a grace 
period to process things before shutting down.  The background thread currently 
doesn't do that, when close() is initiated, it will immediately close all of 
its dependencies.

 

This might not be desirable because there could be remaining tasks to be 
processed before closing.  Maybe the correct things to do is to first stop 
accepting API request, second, let the runOnce() continue to run before the 
shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15304) CompletableApplicationEvents aren't being completed when the consumer is closing

2023-08-04 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15304:
--

 Summary: CompletableApplicationEvents aren't being completed when 
the consumer is closing
 Key: KAFKA-15304
 URL: https://issues.apache.org/jira/browse/KAFKA-15304
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


If the background thread is closed before ingesting all ApplicationEvents, we 
should drain the background queue and try to cancel these events before 
closing. We can try to process these events before closing down the consumer; 
however, we assume that when the user issues a close command, the consumer 
should be shut down promptly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15250) DefaultBackgroundThread is running tight loop

2023-07-25 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15250:
--

 Summary: DefaultBackgroundThread is running tight loop
 Key: KAFKA-15250
 URL: https://issues.apache.org/jira/browse/KAFKA-15250
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15175) Assess the use of nio2 asynchronous channel for KafkaConsumer

2023-07-10 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15175:
--

 Summary: Assess the use of nio2 asynchronous channel for 
KafkaConsumer
 Key: KAFKA-15175
 URL: https://issues.apache.org/jira/browse/KAFKA-15175
 Project: Kafka
  Issue Type: Wish
  Components: consumer
Reporter: Philip Nee


We should assess if NIO2 is appropriate to replace the current nio library with 
more performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15174) Ensure the correct thread is executing the callbacks

2023-07-10 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15174:
--

 Summary: Ensure the correct thread is executing the callbacks
 Key: KAFKA-15174
 URL: https://issues.apache.org/jira/browse/KAFKA-15174
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


We need to add assertion tests to ensure the correct thread is executing the 
offset commit callbacks and rebalance callback



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15173) ApplicationEventQueue and BackgroundEventQueue should be bounded

2023-07-10 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15173:
--

 Summary: ApplicationEventQueue and BackgroundEventQueue should be 
bounded
 Key: KAFKA-15173
 URL: https://issues.apache.org/jira/browse/KAFKA-15173
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


The async consumer uses ApplicationEventQueue and BackgroundEventQueue to 
facilitate message passing between the application thread and the background 
thread.  The current implementation is boundless, which can potentially cause 
OOM and other performance-related issues.

I think the queues need a finite bound, and we need to decide how to handle the 
situation when the bound is reached.  In particular, I would like to answer 
these questions:

 
 # What should the upper limit be for both queues: Can this be a configurable, 
memory-based bound? Or just an arbitrary number of events as the bound.
 # What should happen when the application event queue is filled up?  It seems 
like we should introduce a new exception type and notify the user that the 
consumer is full.
 # What should happen when the background event queue is filled up? This seems 
less likely to happen, but I imagine it could happen when the user stops 
polling the consumer, causing the queue to be filled.
 # Is it necessary to introduce a public configuration for the queue? I think 
initially we would select an arbitrary constant number and see the community 
feedback to make a forward plan accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14977) testDescribeStateOfExistingGroupWithRoundRobinAssignor is flaky

2023-05-08 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14977:
--

 Summary: testDescribeStateOfExistingGroupWithRoundRobinAssignor is 
flaky
 Key: KAFKA-14977
 URL: https://issues.apache.org/jira/browse/KAFKA-14977
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Reporter: Philip Nee
 Attachments: failed_test.log

Relevent ticket: 
- KAFKA-8110 Flaky Test 
DescribeConsumerGroupTest#testDescribeMembersWithConsumersWithoutAssignedPartitions
- KAFKA-7969 Flaky Test 
DescribeConsumerGroupTest#testDescribeOffsetsOfExistingGroupWithNoMembers
- KAFKA-8068 Flaky Test 
DescribeConsumerGroupTest#testDescribeMembersOfExistingGroup
- KAFKA-8706 Kafka 2.3.0 Transient Unit Test Failures on Oracle Linux - See 
attached for details
 
See the attachment for failure



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13668) Failed cluster authorization should not be fatal for producer

2023-05-04 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-13668.

Fix Version/s: 3.6.0
   Resolution: Fixed

> Failed cluster authorization should not be fatal for producer
> -
>
> Key: KAFKA-13668
> URL: https://issues.apache.org/jira/browse/KAFKA-13668
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.6.0
>
>
> The idempotent producer fails fatally if the initial `InitProducerId` returns 
> CLUSTER_AUTHORIZATION_FAILED. This makes the producer unusable until a new 
> instance is constructed. For some applications, it is more convenient to keep 
> the producer instance active and let the administrator fix the permission 
> problem instead of going into a crash loop. Additionally, most applications 
> will probably not be smart enough to reconstruct the producer instance, so if 
> the application does not handle the error by failing, users will have to 
> restart the application manually. 
> I think it would be better to let the producer retry the `InitProducerId` 
> request as long as the user keeps trying to use the producer. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance

2023-05-03 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-14016.

Fix Version/s: 3.5.0
   3.4.1
 Assignee: Philip Nee
   Resolution: Fixed

> Revoke more partitions than expected in Cooperative rebalance
> -
>
> Key: KAFKA-14016
> URL: https://issues.apache.org/jira/browse/KAFKA-14016
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-rebalance-should-fix
> Fix For: 3.5.0, 3.4.1
>
> Attachments: CooperativeStickyAssignorBugReproduction.java
>
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their partition before re-jion. example:
>  # consumer A1-A10 (ten consumers) joined and synced group successfully with 
> generation 1 
>  # New consumer B1 joined and start a rebalance
>  # all consumer joined successfully and then A1 need to revoke partition to 
> transfer to B1
>  # A1 do a very quick syncGroup and re-join, because it revoked partition
>  # A2-A10 didn't send syncGroup before A1 re-join, so after the send 
> syncGruop, will get REBALANCE_IN_PROGRESS
>  # A2-A10 will revoke there partitions and re-join
> So in this rebalance almost every partition revoked, which highly decrease 
> the benefit of Cooperative rebalance 
> I think instead of "{*}resetStateAndRejoin{*} when 
> *RebalanceInProgressException* errors happend in {*}sync group{*}" we need 
> another way to fix it.
> Here is my proposal:
>  # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891
>  # In Server Coordinator handleSyncGroup when generationId checked and group 
> state is PreparingRebalance. We can send the assignment along with the error 
> code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the 
> generation first )
>  # When get the REBALANCE_IN_PROGRESS error in client, try to apply the 
> assignment first and then set the rejoinNeeded = true to make it re-join 
> immediately 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive

2023-05-03 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-13891.

Fix Version/s: 3.5.0
   3.4.1
   (was: 3.6.0)
   Resolution: Fixed

> sync group failed with rebalanceInProgress error cause rebalance many rounds 
> in coopeartive
> ---
>
> Key: KAFKA-13891
> URL: https://issues.apache.org/jira/browse/KAFKA-13891
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> This issue was first found in 
> [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419]
> But the previous PR forgot to reset generation when sync group failed with 
> rebalanceInProgress error. So the previous bug still exists and it may cause 
> consumer to rebalance many rounds before final stable.
> Here's the example ({*}bold is added{*}):
>  # consumer A joined and synced group successfully with generation 1 *( with 
> ownedPartition P1/P2 )*
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  # *After the generation-3 rebalance, consumer A got P3/P4 partition. the 
> ownedPartition is ignored because of old generation.*
>  # *consumer A revoke P1/P2 and re-join to start a new round of rebalance*
>  # *if some other consumer C failed to syncGroup before consumer A's 
> joinGroup. the same issue will happens again and result in many rounds of 
> rebalance before stable*
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14960) Metadata Request Manager and listTopics/partitionsFor API

2023-05-02 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14960:
--

 Summary: Metadata Request Manager and listTopics/partitionsFor API
 Key: KAFKA-14960
 URL: https://issues.apache.org/jira/browse/KAFKA-14960
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


Implement listTopics and partitionsFor



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14950) Implement assign() and assignment()

2023-04-27 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14950:
--

 Summary: Implement assign() and assignment()
 Key: KAFKA-14950
 URL: https://issues.apache.org/jira/browse/KAFKA-14950
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


Implement assign() and assignment()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14875) Implement Wakeup()

2023-03-31 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14875:
--

 Summary: Implement Wakeup()
 Key: KAFKA-14875
 URL: https://issues.apache.org/jira/browse/KAFKA-14875
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


Implement wakeup() and WakeupException.  This would be different to the current 
implementation because I think we just need to interrupt the blocking futures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14754) improve consumer under example package

2023-03-07 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-14754.

Resolution: Fixed

> improve consumer under example package
> --
>
> Key: KAFKA-14754
> URL: https://issues.apache.org/jira/browse/KAFKA-14754
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Philip Nee
>Priority: Major
>
> I found the producer and consumer example is not in a good form. For example:
>  # Both consumer and producer doesn't gracefully close the resource after 
> completed
>  # The example doesn't provide a good example to handle different kind of 
> exceptions. It's just a happy path example
>  # No clear comment to instruct users why we should do this, and what it is 
> doing for each operation.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14753) improve producer under example package

2023-03-07 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-14753.

Resolution: Fixed

> improve producer under example package
> --
>
> Key: KAFKA-14753
> URL: https://issues.apache.org/jira/browse/KAFKA-14753
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Federico Valeri
>Priority: Major
>
> I found the producer and consumer example is not in a good form. For example:
>  # Both consumer and producer doesn't gracefully close the resource after 
> completed
>  # The example doesn't provide a good example to handle different kind of 
> exceptions. It's just a happy path example
>  # No clear comment to instruct users why we should do this, and what it is 
> doing for each operation.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14752) improve kafka examples under examples package

2023-03-07 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee reopened KAFKA-14752:


> improve kafka examples under examples package
> -
>
> Key: KAFKA-14752
> URL: https://issues.apache.org/jira/browse/KAFKA-14752
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Federico Valeri
>Priority: Major
>  Labels: newbie, newbie++
>
> Kafka provided some examples under "examples" package. Currently we provided
>  * java-producer-consumer-demo, which is to produce 1 records and then 
> consume all of them
>  * exactly-once-demo, which is to produce records -> consume -> process  -> 
> consume.
> Among them, the base component is producer and consumer. However, I found the 
> producer and consumer example is not in a good form. For example:
>  # Both consumer and producer doesn't gracefully close the resource after 
> completed
>  # The example doesn't provide a good example to handle different kind of 
> exceptions. It's just a happy path example
>  # No clear comment to instruct users why we should do this, and what it is 
> doing for each operation.
>  
> Furthermore, while running both the examples, I saw flood of logs output 
> because we print one line for message sent, and one line for message 
> received. In java-producer-consumer-demo, there will be 1 records 
> sent/received, so > 2 lines of logs output. Same for exactly-once-demo. 
> Maybe we should consider to reduce the record number.
>  
> One more thing, in exactly-once-demo.java, there are clear class java doc in 
> the demo file, but there's nothing in java-producer-consumer-demo.java. We 
> should also improve that.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14752) improve kafka examples under examples package

2023-03-07 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-14752.

Resolution: Fixed

> improve kafka examples under examples package
> -
>
> Key: KAFKA-14752
> URL: https://issues.apache.org/jira/browse/KAFKA-14752
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Federico Valeri
>Priority: Major
>  Labels: newbie, newbie++
>
> Kafka provided some examples under "examples" package. Currently we provided
>  * java-producer-consumer-demo, which is to produce 1 records and then 
> consume all of them
>  * exactly-once-demo, which is to produce records -> consume -> process  -> 
> consume.
> Among them, the base component is producer and consumer. However, I found the 
> producer and consumer example is not in a good form. For example:
>  # Both consumer and producer doesn't gracefully close the resource after 
> completed
>  # The example doesn't provide a good example to handle different kind of 
> exceptions. It's just a happy path example
>  # No clear comment to instruct users why we should do this, and what it is 
> doing for each operation.
>  
> Furthermore, while running both the examples, I saw flood of logs output 
> because we print one line for message sent, and one line for message 
> received. In java-producer-consumer-demo, there will be 1 records 
> sent/received, so > 2 lines of logs output. Same for exactly-once-demo. 
> Maybe we should consider to reduce the record number.
>  
> One more thing, in exactly-once-demo.java, there are clear class java doc in 
> the demo file, but there's nothing in java-producer-consumer-demo.java. We 
> should also improve that.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14761) Integration Tests for the New Consumer Implementation

2023-02-24 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14761:
--

 Summary: Integration Tests for the New Consumer Implementation
 Key: KAFKA-14761
 URL: https://issues.apache.org/jira/browse/KAFKA-14761
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


This Jira tracks the efforts of integratoin testing for the new consumer we are 
implementing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14468) Refactor Commit Logic

2022-12-13 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14468:
--

 Summary: Refactor Commit Logic
 Key: KAFKA-14468
 URL: https://issues.apache.org/jira/browse/KAFKA-14468
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


Refactor commit logic using the new multi-threaded coordinator construct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14438) Stop supporting empty consumer groupId

2022-12-02 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14438:
--

 Summary: Stop supporting empty consumer groupId
 Key: KAFKA-14438
 URL: https://issues.apache.org/jira/browse/KAFKA-14438
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee
 Fix For: 4.0.0


Currently, a warning message is logged upon using an empty consumer groupId. In 
the next major release, we should drop the support of empty ("") consumer 
groupId.

 

cc [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14252) Create background thread skeleton

2022-10-19 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-14252.

Resolution: Fixed

> Create background thread skeleton
> -
>
> Key: KAFKA-14252
> URL: https://issues.apache.org/jira/browse/KAFKA-14252
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>
> The event handler internally instantiates a background thread to consume 
> ApplicationEvents and produce BackgroundEvents.  In this ticket, we will 
> create a skeleton of the background thread.  We will incrementally add 
> implementation in the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14274) Implement fetching logic

2022-10-03 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14274:
--

 Summary: Implement fetching logic
 Key: KAFKA-14274
 URL: https://issues.apache.org/jira/browse/KAFKA-14274
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Philip Nee


The fetch request and fetch processing should happen asynchronously.  More 
specifically, we have the background thread to send fetch requests autonomously 
and relay the response back to the polling thread.  The polling thread collects 
these fetch requests and returns the ConsumerRecord.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14264) Create a coordinator class for the new implementation

2022-09-27 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14264:
--

 Summary: Create a coordinator class for the new implementation
 Key: KAFKA-14264
 URL: https://issues.apache.org/jira/browse/KAFKA-14264
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


To refactor the consumer, we changed how the coordinator is called.  However, 
there will be a time period where the old and new implementation need to 
coexist, so we will need to override some of the methods and create a new 
implementation of the coordinator.  In particular:
 # ensureCoordinatorReady needs to be non-blocking or we could just use the 
sendFindCoordinatorRequest.
 # joinGroupIfNeeded needs to be broken up into more find grain stages for the 
new implementation to work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14252) Create background thread skeleton

2022-09-21 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14252:
--

 Summary: Create background thread skeleton
 Key: KAFKA-14252
 URL: https://issues.apache.org/jira/browse/KAFKA-14252
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


The event handler internally instantiates a background thread to consume 
ApplicationEvents and produce BackgroundEvents.  In this ticket, we will create 
a skeleton of the background thread.  We will incrementally add implementation 
in the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14247) Implement EventHandler interface and DefaultEventHandler

2022-09-20 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14247:
--

 Summary: Implement EventHandler interface and DefaultEventHandler
 Key: KAFKA-14247
 URL: https://issues.apache.org/jira/browse/KAFKA-14247
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


The polling thread uses events to communicate with the background thread.  The 
events send to the background thread are the {_}Requests{_}, and the events 
send from the background thread to the polling thread are the {_}Responses{_}.

 

Here we have an EventHandler interface and DefaultEventHandler implementation.  
The implementation uses two blocking queues to send events both ways.  The two 
methods, add and poll allows the client, i.e., the polling thread, to retrieve 
and add events to the handler.

 

PR: https://github.com/apache/kafka/pull/12663



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14246) Refactor KafkaConsumer Threading Model

2022-09-20 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14246:
--

 Summary: Refactor KafkaConsumer Threading Model
 Key: KAFKA-14246
 URL: https://issues.apache.org/jira/browse/KAFKA-14246
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


Hi community,

 

We are refactoring the current KafkaConsumer and making it more asynchronous.  
This is the proposal's placeholder; subtasks will be linked to this ticket.  
Please review the design document and feel free to use this thread for 
discussion. 

 

The design document is here: 
[https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor]

 

The original email thread is here: 
https://lists.apache.org/thread/13jvwzkzmb8c6t7drs4oj2kgkjzcn52l

 

I will continue to update the 1pager as reviews and comments come.

 

Thanks, 

P



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14196) Flaky OffsetValidationTest seems to indicate potential duplication issue during rebalance

2022-09-01 Thread Philip Nee (Jira)
Philip Nee created KAFKA-14196:
--

 Summary: Flaky OffsetValidationTest seems to indicate potential 
duplication issue during rebalance
 Key: KAFKA-14196
 URL: https://issues.apache.org/jira/browse/KAFKA-14196
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 3.2.1
Reporter: Philip Nee


Several flaky tests under OffsetValidationTest are indicating potential 
consumer duplication issue, when autocommit is enabled.  Below shows the 
failure message:

 
{code:java}
Total consumed records 3366 did not match consumed position 3331 {code}
 

After investigating the log, I discovered that the data consumed between the 
start of a rebalance event and the async commit was lost for those failing 
tests.  In the example below, the rebalance event kicks in at around 
1662054846995 (first record), and the async commit of the offset 3739 is 
completed at around 1662054847015 (right before partitions_revoked).

 
{code:java}
{"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
{"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
{"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
{"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
{"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
{"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
 {code}
A few things to note here:
 # This is highly flaky, I found 1/4 runs will fail the tests
 # Manually calling commitSync in the onPartitionsRevoke cb seems to alleviate 
the issue
 # Setting includeMetadataInTimeout to false also seems to alleviate the issue.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)