dajac commented on code in PR #14582: URL: https://github.com/apache/kafka/pull/14582#discussion_r1395449792
########## tests/kafkatest/tests/connect/connect_distributed_test.py: ########## @@ -560,8 +633,19 @@ def _wait_for_loggers(self, level, request_time, namespace, workers=None): ) @cluster(num_nodes=6) - @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade) - def test_file_source_and_sink(self, security_protocol, exactly_once_source, connect_protocol, metadata_quorum): + @matrix( + security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], + exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], Review Comment: nit: Should `connect_protocol` be on a new line? ########## tests/kafkatest/tests/client/consumer_test.py: ########## @@ -450,10 +532,21 @@ def __init__(self, test_context): }) @cluster(num_nodes=6) - @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", + @matrix( + assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", "org.apache.kafka.clients.consumer.RoundRobinAssignor", - "org.apache.kafka.clients.consumer.StickyAssignor"], metadata_quorum=quorum.all_non_upgrade) - def test_valid_assignment(self, assignment_strategy, metadata_quorum=quorum.zk): + "org.apache.kafka.clients.consumer.StickyAssignor"], + metadata_quorum=[quorum.zk], + use_new_coordinator=[False] + ) + @matrix( + assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", + "org.apache.kafka.clients.consumer.RoundRobinAssignor", + "org.apache.kafka.clients.consumer.StickyAssignor"], Review Comment: ditto. ########## tests/kafkatest/tests/connect/connect_distributed_test.py: ########## @@ -560,8 +633,19 @@ def _wait_for_loggers(self, level, request_time, namespace, workers=None): ) @cluster(num_nodes=6) - @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade) - def test_file_source_and_sink(self, security_protocol, exactly_once_source, connect_protocol, metadata_quorum): + @matrix( + security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], + exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], + metadata_quorum=[quorum.zk], + use_new_coordinator=[False] + ) + @matrix( + security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], + exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], Review Comment: ditto. ########## tests/kafkatest/tests/core/security_rolling_upgrade_test.py: ########## @@ -126,10 +126,10 @@ def remove_separate_broker_listener(self, client_security_protocol): self.bounce() @cluster(num_nodes=8) - @matrix(client_protocol=[SecurityConfig.SSL]) + @matrix(client_protocol=[SecurityConfig.SSL], use_new_coordinator=[True, False]) Review Comment: Do all the tests in this file use ZK or KRaft? I suppose that it is ZK by default... ########## tests/kafkatest/tests/core/transactions_test.py: ########## @@ -246,8 +246,9 @@ def setup_topics(self): @matrix(failure_mode=["hard_bounce", "clean_bounce"], bounce_target=["brokers", "clients"], check_order=[True, False], - use_group_metadata=[True, False]) - def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.all): + use_group_metadata=[True, False], + use_new_coordinator=[True, False]) Review Comment: I suppose that this one fails because we haven't implemented the transactional offsets commits yet. I also wonder if they are really executed with KRaft. ########## tests/kafkatest/tests/client/consumer_test.py: ########## @@ -450,10 +532,21 @@ def __init__(self, test_context): }) @cluster(num_nodes=6) - @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", + @matrix( + assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", "org.apache.kafka.clients.consumer.RoundRobinAssignor", - "org.apache.kafka.clients.consumer.StickyAssignor"], metadata_quorum=quorum.all_non_upgrade) - def test_valid_assignment(self, assignment_strategy, metadata_quorum=quorum.zk): + "org.apache.kafka.clients.consumer.StickyAssignor"], Review Comment: nit: Let's align the assignor names. ########## tests/kafkatest/tests/streams/streams_broker_compatibility_test.py: ########## @@ -64,25 +64,12 @@ def setUp(self): @cluster(num_nodes=4) - @parametrize(broker_version=str(LATEST_3_5)) - @parametrize(broker_version=str(LATEST_3_4)) - @parametrize(broker_version=str(LATEST_3_3)) - @parametrize(broker_version=str(LATEST_3_2)) - @parametrize(broker_version=str(LATEST_3_1)) - @parametrize(broker_version=str(LATEST_3_0)) - @parametrize(broker_version=str(LATEST_2_8)) - @parametrize(broker_version=str(LATEST_2_7)) - @parametrize(broker_version=str(LATEST_2_6)) - @parametrize(broker_version=str(LATEST_2_5)) - @parametrize(broker_version=str(LATEST_2_4)) - @parametrize(broker_version=str(LATEST_2_3)) - @parametrize(broker_version=str(LATEST_2_2)) - @parametrize(broker_version=str(LATEST_2_1)) - @parametrize(broker_version=str(LATEST_2_0)) - @parametrize(broker_version=str(LATEST_1_1)) - @parametrize(broker_version=str(LATEST_1_0)) - @parametrize(broker_version=str(LATEST_0_11_0)) - def test_compatible_brokers_eos_disabled(self, broker_version): + @matrix( Review Comment: Is KRaft used by default here as well? ########## tests/kafkatest/tests/core/replica_scale_test.py: ########## @@ -103,8 +123,28 @@ def test_produce_consume(self, topic_count, partition_count, replication_factor, trogdor.stop() @cluster(num_nodes=12) - @matrix(topic_count=[50], partition_count=[34], replication_factor=[3], metadata_quorum=quorum.all_non_upgrade) - def test_clean_bounce(self, topic_count, partition_count, replication_factor, metadata_quorum=quorum.zk): + @matrix( + topic_count=[50], + partition_count=[34], + replication_factor=[3], + metadata_quorum=[quorum.zk], + use_new_coordinator=[False] + ) + @matrix( + topic_count=[50], + partition_count=[34], + replication_factor=[3], + metadata_quorum=[quorum.isolated_kraft], + use_new_coordinator=[True, False] + ) + def test_clean_bounce( Review Comment: ditto. ########## tests/kafkatest/tests/core/replica_scale_test.py: ########## @@ -48,8 +48,28 @@ def teardown(self): self.zk.stop() @cluster(num_nodes=12) - @matrix(topic_count=[50], partition_count=[34], replication_factor=[3], metadata_quorum=quorum.all_non_upgrade) - def test_produce_consume(self, topic_count, partition_count, replication_factor, metadata_quorum=quorum.zk): + @matrix( + topic_count=[50], + partition_count=[34], + replication_factor=[3], + metadata_quorum=[quorum.zk], + use_new_coordinator=[False] + ) + @matrix( + topic_count=[50], + partition_count=[34], + replication_factor=[3], + metadata_quorum=[quorum.isolated_kraft], + use_new_coordinator=[True, False] + ) + def test_produce_consume( Review Comment: nit: I won't think that we use this style for format method definition in python. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org