[ 
https://issues.apache.org/jira/browse/KAFKA-17962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17962:
------------------------------
    Description: 
Many of the system tests fail in {{connect_distributed_test.py}} because the 
parameterized value for {{group_protocol}} is not being consistently passed 
down through all parts of the system test. This can be confirmed by looking 
through the system test's results directory (e.g. 
{{{}results/2024-11-25--002/ConnectDistributedTest/test_pause_and_resume_sink/connect_protocol=compatible.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer/1{}}})
 after the test fails.

Here's what it shows for me:

{code:bash}
$ grep -R "group.protocol = " ConnectDistributedService-0-281473237465016 
ConnectDistributedService-0-281473237465016/ducker06/connect.log:       
group.protocol = classic
ConnectDistributedService-0-281473237465016/ducker06/connect.log:       
group.protocol = classic
ConnectDistributedService-0-281473237465016/ducker06/connect.log:       
group.protocol = classic
ConnectDistributedService-0-281473237465016/ducker05/connect.log:       
group.protocol = classic
ConnectDistributedService-0-281473237465016/ducker05/connect.log:       
group.protocol = classic
ConnectDistributedService-0-281473237465016/ducker05/connect.log:       
group.protocol = classic
ConnectDistributedService-0-281473237465016/ducker05/connect.log:       
group.protocol = consumer
ConnectDistributedService-0-281473237465016/ducker04/connect.log:       
group.protocol = classic
ConnectDistributedService-0-281473237465016/ducker04/connect.log:       
group.protocol = classic
ConnectDistributedService-0-281473237465016/ducker04/connect.log:       
group.protocol = classic
{code}

The {{ConsumerConfig}} output clearly shows that the {{group.protocol}} is not 
properly used when configuring the {{KafkaConsumer}} throughout the test.

This results in failures shown in this 
{{test_pause_and_resume_sink.connect_protocol}} output:

{noformat}
TimeoutError('Failed to consume messages after resuming sink connector')
Traceback (most recent call last):
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 351, in _do_run
    data = self.run_test()
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 411, in run_test
    return self.test_context.function(self.test)
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
 line 438, in wrapper
    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
 line 415, in test_pause_and_resume_sink
    wait_until(lambda: len(self.sink.received_messages()) > num_messages, 
timeout_sec=30,
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
    raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: Failed to consume messages after resuming sink 
connector
{noformat}

  was:
The following failure is consistently seen when running 
{{{}test_pause_and_resume_sink.connect_protocol{}}}:
{noformat}
TimeoutError('Failed to consume messages after resuming sink connector')
Traceback (most recent call last):
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 351, in _do_run
    data = self.run_test()
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 411, in run_test
    return self.test_context.function(self.test)
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
 line 438, in wrapper
    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
 line 415, in test_pause_and_resume_sink
    wait_until(lambda: len(self.sink.received_messages()) > num_messages, 
timeout_sec=30,
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
    raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: Failed to consume messages after resuming sink 
connector
{noformat}
Parameters on fail:
 * connect_protocol=compatible
 * metadata_quorum=ISOLATED_KRAFT
 * use_new_coordinator=True
 * group_protocol=consumer


> Upgrade Connect distributed system tests to handle CONSUMER group protocol
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-17962
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17962
>             Project: Kafka
>          Issue Type: Test
>          Components: clients, consumer, system tests
>    Affects Versions: 3.9.0
>            Reporter: Kirk True
>            Assignee: Kirk True
>            Priority: Major
>              Labels: kip-848-client-support
>             Fix For: 4.0.0
>
>
> Many of the system tests fail in {{connect_distributed_test.py}} because the 
> parameterized value for {{group_protocol}} is not being consistently passed 
> down through all parts of the system test. This can be confirmed by looking 
> through the system test's results directory (e.g. 
> {{{}results/2024-11-25--002/ConnectDistributedTest/test_pause_and_resume_sink/connect_protocol=compatible.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer/1{}}})
>  after the test fails.
> Here's what it shows for me:
> {code:bash}
> $ grep -R "group.protocol = " ConnectDistributedService-0-281473237465016 
> ConnectDistributedService-0-281473237465016/ducker06/connect.log:     
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker06/connect.log:     
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker06/connect.log:     
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker05/connect.log:     
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker05/connect.log:     
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker05/connect.log:     
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker05/connect.log:     
> group.protocol = consumer
> ConnectDistributedService-0-281473237465016/ducker04/connect.log:     
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker04/connect.log:     
> group.protocol = classic
> ConnectDistributedService-0-281473237465016/ducker04/connect.log:     
> group.protocol = classic
> {code}
> The {{ConsumerConfig}} output clearly shows that the {{group.protocol}} is 
> not properly used when configuring the {{KafkaConsumer}} throughout the test.
> This results in failures shown in this 
> {{test_pause_and_resume_sink.connect_protocol}} output:
> {noformat}
> TimeoutError('Failed to consume messages after resuming sink connector')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 351, in _do_run
>     data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 411, in run_test
>     return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 438, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
>  line 415, in test_pause_and_resume_sink
>     wait_until(lambda: len(self.sink.received_messages()) > num_messages, 
> timeout_sec=30,
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
>     raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Failed to consume messages after resuming sink 
> connector
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to