[
https://issues.apache.org/jira/browse/KAFKA-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax resolved KAFKA-8017.
------------------------------------
Resolution: Invalid
The streams-broker-upgrade test was delete at some point in the past. Closing
this ticket.
> Narrow the scope of Streams' broker-upgrade-test
> ------------------------------------------------
>
> Key: KAFKA-8017
> URL: https://issues.apache.org/jira/browse/KAFKA-8017
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Guozhang Wang
> Priority: Major
>
> We had a streams-broker-upgrade test in which we kept the streams client as
> the dev version, and upgrade/downgrade brokers between arbitrary versions.
> This has several issues:
> 1) not all upgrade / downgrade paths are supported due to message format
> change.
> 2) even for those supported paths, we should consider the impact of
> inter.broker.protocol and message.format. More specifically: when upgrade to
> new version byte code, we should stick with the old protocol/version, when
> down grade to old version byte code, we should start with the old
> protocol/version.
> A good reference to look at is the broker's own upgrade path where they
> listed all the possible path so far:
> {code}
> @parametrize(from_kafka_version=str(LATEST_1_1),
> to_message_format_version=None, compression_types=["none"])
> @parametrize(from_kafka_version=str(LATEST_1_1),
> to_message_format_version=None, compression_types=["lz4"])
> @parametrize(from_kafka_version=str(LATEST_1_0),
> to_message_format_version=None, compression_types=["none"])
> @parametrize(from_kafka_version=str(LATEST_1_0),
> to_message_format_version=None, compression_types=["snappy"])
> @parametrize(from_kafka_version=str(LATEST_0_11_0),
> to_message_format_version=None, compression_types=["gzip"])
> @parametrize(from_kafka_version=str(LATEST_0_11_0),
> to_message_format_version=None, compression_types=["lz4"])
> @parametrize(from_kafka_version=str(LATEST_0_10_2),
> to_message_format_version=str(LATEST_0_9), compression_types=["none"])
> @parametrize(from_kafka_version=str(LATEST_0_10_2),
> to_message_format_version=str(LATEST_0_10), compression_types=["snappy"])
> @parametrize(from_kafka_version=str(LATEST_0_10_2),
> to_message_format_version=None, compression_types=["none"])
> @parametrize(from_kafka_version=str(LATEST_0_10_2),
> to_message_format_version=None, compression_types=["lz4"])
> @parametrize(from_kafka_version=str(LATEST_0_10_1),
> to_message_format_version=None, compression_types=["lz4"])
> @parametrize(from_kafka_version=str(LATEST_0_10_1),
> to_message_format_version=None, compression_types=["snappy"])
> @parametrize(from_kafka_version=str(LATEST_0_10_0),
> to_message_format_version=None, compression_types=["snappy"])
> @parametrize(from_kafka_version=str(LATEST_0_10_0),
> to_message_format_version=None, compression_types=["lz4"])
> @cluster(num_nodes=7)
> @parametrize(from_kafka_version=str(LATEST_0_9),
> to_message_format_version=None, compression_types=["none"],
> security_protocol="SASL_SSL")
> @cluster(num_nodes=6)
> @parametrize(from_kafka_version=str(LATEST_0_9),
> to_message_format_version=None, compression_types=["snappy"])
> @parametrize(from_kafka_version=str(LATEST_0_9),
> to_message_format_version=None, compression_types=["lz4"])
> @parametrize(from_kafka_version=str(LATEST_0_9),
> to_message_format_version=str(LATEST_0_9), compression_types=["none"])
> @parametrize(from_kafka_version=str(LATEST_0_9),
> to_message_format_version=str(LATEST_0_9), compression_types=["lz4"])
> @cluster(num_nodes=7)
> @parametrize(from_kafka_version=str(LATEST_0_8_2),
> to_message_format_version=None, compression_types=["none"])
> @parametrize(from_kafka_version=str(LATEST_0_8_2),
> to_message_format_version=None, compression_types=["snappy"])
> def test_upgrade(self, from_kafka_version, to_message_format_version,
> compression_types,
> security_protocol="PLAINTEXT"):
> {code}
> And their upgrade code is:
> {code}
> def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
> self.logger.info("First pass bounce - rolling upgrade")
> for node in self.kafka.nodes:
> self.kafka.stop_node(node)
> node.version = DEV_BRANCH
> node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] =
> from_kafka_version
> node.config[config_property.MESSAGE_FORMAT_VERSION] =
> from_kafka_version
> self.kafka.start_node(node)
> self.logger.info("Second pass bounce - remove
> inter.broker.protocol.version config")
> for node in self.kafka.nodes:
> self.kafka.stop_node(node)
> del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
> if to_message_format_version is None:
> del node.config[config_property.MESSAGE_FORMAT_VERSION]
> else:
> node.config[config_property.MESSAGE_FORMAT_VERSION] =
> to_message_format_version
> self.kafka.start_node(node)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)