Gerrrr commented on a change in pull request #11945:
URL: https://github.com/apache/kafka/pull/11945#discussion_r839408299
##########
File path: docs/streams/upgrade-guide.html
##########
@@ -34,9 +34,9 @@ <h1>Upgrade Guide and API Changes</h1>
</div>
<p>
- Upgrading from any older version to {{fullDotVersion}} is possible: if
upgrading from 2.3 or below, you will need to do two rolling bounces, where
during the first rolling bounce phase you set the config
<code>upgrade.from="older version"</code>
- (possible values are <code>"0.10.0" - "2.3"</code>) and during the
second you remove it. This is required to safely upgrade to the new cooperative
rebalancing protocol of the embedded consumer. Note that you will remain using
the old eager
- rebalancing protocol if you skip or delay the second rolling bounce,
but you can safely switch over to cooperative at any time once the entire group
is on 2.4+ by removing the config value and bouncing. For more details please
refer to
+ Upgrading from any older version to {{fullDotVersion}} is possible: if
upgrading from 3.3 or below, you will need to do two rolling bounces, where
during the first rolling bounce phase you set the config
<code>upgrade.from="older version"</code>
Review comment:
Right! Fixed in cd8ba1848cefca1c89b4011e9d0716b8534f6782.
##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -636,7 +684,10 @@
"Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" +
UPGRADE_FROM_0101 + "\", \"" +
UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" +
UPGRADE_FROM_10 + "\", \"" +
UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" +
UPGRADE_FROM_21 + "\", \"" +
- UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\" (for upgrading from
the corresponding old version).";
Review comment:
Good catch! Fixed in cd8ba1848cefca1c89b4011e9d0716b8534f6782.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -62,6 +71,42 @@ public void setIfUnset(final SerdeGetter getter) {
}
}
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean
isKey) {
+ this.upgradeFromV0 = upgradeFromV0(configs);
+ }
+
+ private static boolean upgradeFromV0(final Map<String, ?> configs) {
+ final Object upgradeFrom =
configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+ if (!(upgradeFrom instanceof String)) {
Review comment:
Replaced with a not-null check in
8ebbaaf38ee79642ccb322694de7556f260f93ca.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -60,6 +63,42 @@ public void setIfUnset(final SerdeGetter getter) {
}
}
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean
isKey) {
+ this.upgradeFromV0 = upgradeFromV0(configs);
+ }
+
+ private static boolean upgradeFromV0(final Map<String, ?> configs) {
+ final Object upgradeFrom =
configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+ if (!(upgradeFrom instanceof String)) {
Review comment:
Replaced with a not-null check in
https://github.com/apache/kafka/commit/8ebbaaf38ee79642ccb322694de7556f260f93ca.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]