bbejeck commented on code in PR #17328: URL: https://github.com/apache/kafka/pull/17328#discussion_r1809553262
########## tests/kafkatest/tests/streams/streams_upgrade_test.py: ########## @@ -111,102 +105,10 @@ def perform_broker_upgrade(self, to_version): node.version = KafkaVersion(to_version) self.kafka.start_node(node) - @ignore @cluster(num_nodes=6) - @matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions) - def test_upgrade_downgrade_brokers(self, from_version, to_version): - """ - Start a smoke test client then perform rolling upgrades on the broker. - """ - - if from_version == to_version: - return - - self.replication = 3 - self.num_kafka_nodes = 3 - self.partitions = 1 - self.isr = 2 - self.topics = { - 'echo' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr}}, - 'data' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'min' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'max' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'sum' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'dif' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'cnt' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'avg' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} } - } - - # Setup phase - self.zk = ZookeeperService(self.test_context, num_nodes=1) - self.zk.start() - - # number of nodes needs to be >= 3 for the smoke test - self.kafka = KafkaService(self.test_context, num_nodes=self.num_kafka_nodes, - zk=self.zk, version=KafkaVersion(from_version), topics=self.topics) - self.kafka.start() - - # allow some time for topics to be created - wait_until(lambda: self.confirm_topics_on_all_brokers(set(self.topics.keys())), - timeout_sec=60, - err_msg="Broker did not create all topics in 60 seconds ") - - self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) - - processor = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once") - - with self.driver.node.account.monitor_log(self.driver.STDOUT_FILE) as driver_monitor: - self.driver.start() - - with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: - processor.start() - monitor.wait_until(self.processed_data_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_data_msg + str(processor.node)) - - connected_message = "Discovered group coordinator" - with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor: - with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor: - self.perform_broker_upgrade(to_version) - - log_monitor.wait_until(connected_message, - timeout_sec=120, - err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account)) - - stdout_monitor.wait_until(self.processed_data_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on" % self.processed_data_msg + str(processor.node.account)) - - # SmokeTestDriver allows up to 6 minutes to consume all - # records for the verification step so this timeout is set to - # 6 minutes (360 seconds) for consuming of verification records - # and a very conservative additional 2 minutes (120 seconds) to process - # the records in the verification step - driver_monitor.wait_until('ALL-RECORDS-DELIVERED\|PROCESSED-MORE-THAN-GENERATED', - timeout_sec=480, - err_msg="Never saw output '%s' on" % 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' + str(self.driver.node.account)) - - self.driver.stop() - processor.stop() - processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False) - - @cluster(num_nodes=6) - @matrix(from_version=metadata_1_versions) - @matrix(from_version=metadata_2_versions) - @matrix(from_version=fk_join_versions) - def test_rolling_upgrade_with_2_bounces(self, from_version): + @matrix(from_version=metadata_2_versions, metadata_quorum=[quorum.combined_kraft]) + @matrix(from_version=fk_join_versions, metadata_quorum=[quorum.combined_kraft]) + def test_rolling_upgrade_with_2_bounces(self, from_version, metadata_quorum): Review Comment: Required to indicate to use KRaft for metadata quorum. Failing to do so without a `@matrix` annotation results in this error as the test will default to ZK without it. ```File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 541, in security_config zk_sasl=self.zk.zk_sasl if self.quorum_info.using_zk else False, zk_tls=self.zk_client_secure, AttributeError: 'NoneType' object has no attribute 'zk_sasl' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org