[ 
https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10336:
---------------------------------
    Affects Version/s: 2.6.0
                       2.3.0
                       2.4.0
                       2.5.0

> Rolling upgrade with Suppression AND Standbys may throw exceptions
> ------------------------------------------------------------------
>
>                 Key: KAFKA-10336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10336
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>            Reporter: John Roesler
>            Priority: Blocker
>             Fix For: 2.7.0
>
>
> Tl;dr:
> If you have standbys AND use Suppress with changelogging enabled, you may 
> experience exceptions leading to threads shutting down on the OLD instances 
> during a rolling upgrade. No corruption is expected, and when the rolling 
> upgrade completes, all threads will be running and processing correctly.
> Details:
> The Suppression changelog has had to change its internal data format several 
> times to fix bugs. The binary schema of the changelog values is determined by 
> a version header on the records, and new versions are able to decode all old 
> versions' formats.
> The suppression changelog decoder is also configured to throw an exception if 
> it encounters a version number that it doesn't recognize, causing the thread 
> to stop processing and shut down.
> When standbys are configured, there is one so-called "active" worker writing 
> into the suppression buffer and sending the same messages into the changelog, 
> while another "standby" worker reads those messages, decodes them, and 
> maintains a hot-standby replica of the suppression buffer.
> If the standby worker is running and older version of Streams than the active 
> worker, what can happen today is that the active worker may write changelog 
> messages with a higher version number than the standby worker can understand. 
> When the standby worker receives one of these messages, it will throw the 
> exception and shut down its thread.
> Note, although the exceptions are undesired, at least this behavior protects 
> the integrity of the application and prevents data corruption or loss.
> Workaround:
> Several workarounds are possible:
> This only affects clusters that do all of (A) rolling bounce, (B) 
> suppression, (C) standby replicas, (D) changelogged suppression buffers. 
> Changing any of those four variables will prevent the issue from occurring. I 
> would NOT recommend disabling (D), and (B) is probably off the table, since 
> the application logic presumably depends on it. Therefore, your practical 
> choices are to disable standbys (C), or to do a full-cluster bounce (A). 
> Personally, I think (A) is the best option.
> Also note, although the exceptions and threads shutting down are not ideal, 
> they would only afflict the old-versioned nodes. I.e., the nodes you intend 
> to replace anyway. So another "workaround" is simply to ignore the exceptions 
> and proceed with the rolling bounce. As the old-versioned nodes are replaced 
> with new-versioned nodes, the new nodes will again be able to decode their 
> peers' changelog messages and be able to maintain the hot-standby replicas of 
> the suppression buffers.
> Detection:
> Although I really should have anticipated this condition, I first detected it 
> while expanding our system test coverage as part of KAFKA-10173. I added a 
> rolling upgrade test with an application that uses both suppression and 
> standby replicas, and observed that the rolling upgrades would occasionally 
> cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the 
> rolling-upgrade configuration and only do full-cluster upgrades. Resolving 
> _this_ ticket will allow us to re-enable rolling upgrades.
> Proposed solution:
> Part 1:
> Since Streams can decode both current and past versions, but not future 
> versions, we need to implement a mechanism to prevent new-versioned nodes 
> from writing new-versioned messages, which would appear as future-versioned 
> messages to the old-versioned nodes.
> We have an UPGRADE_FROM configuration that we could leverage to accomplish 
> this. In that case, when upgrading from 2.3 to 2.4, you would set 
> UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) 
> nodes would continue writing messages in the old (2.3) format. Thus, the 
> still-running old nodes will still be able to read them.
> Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. 
> Post-bounce, the nodes would start writing in the 2.4 format, which is ok 
> because all the members are running 2.4 at this point and can decode these 
> messages, even if they are still configured to write with version 2.3.
> After the second rolling bounce, the whole cluster is both running 2.4 and 
> writing with the 2.4 format.
> Part 2:
> Managing two rolling bounces can be difficult, so it is also desirable to 
> implement a mechanism for automatically negotiating the schema version 
> internally.
> In fact, this is already present in Streams, and it is called "version 
> probing". Right now, version probing is used to enable the exact same kind of 
> transition from an old-message-format to a new-message-format when both old 
> and new members are in the cluster, but it is only used for the assignment 
> protocol messages (i.e., the formats of the subscription and assignment 
> messages that group members send to each other).
> We can expand the "version probing" version from "assignment protocol 
> version" to "general protocol version". Then, when the cluster contains 
> mixed-versioned members, the entire cluster will only write changelog (and 
> repartition) messages with the protocol version of the oldest-versioned 
> member.
> With that in place, you would never need to specify UPGRADE_FROM. You'd 
> simply perform rolling upgrades, and Streams would internally negotiate the 
> right protocol/schema versions to write such that all running members can 
> decode them at all times.
> Part 3:
> Although Part 2 is sufficient to ensure rolling upgrades, it does not allow 
> for downgrades. If you upgrade your whole cluster to 2.4, then later decide 
> you want to go back to 2.3, you will find that the 2.3-versioned nodes crash 
> when attempting to decode changelog messages that had previously been written 
> by 2.4 nodes. Since the changelog messages are by design durable 
> indefinitely, this effectively prevents ever downgrading.
> To solve this last problem, I propose that, although we don't require 
> UPGRADE_FROM, we still allow it. Specifying UPGRADE_FROM=2.3 would cause 
> new-versioned members to set their "max protocol version" in the assignment 
> protocol to 2.3, so version probing would never let the members upgrade their 
> message formats to 2.4. You could run 2.4 as long as you want with 
> UPGRADE_FROM set to 2.3. If any issues arise, you could still downgrade the 
> application to version 2.3.
> Once you're satisfied that 2.4 is working and you won't want to downgrade 
> anymore, you would remove the UPGRADE_FROM config and bounce again. Now, the 
> members will be free to start writing with the latest message format.
> Notes:
>  * No KIP is required, since all the needed mechanisms are already present
>  * As part of completing this work, we should enable rolling upgrade tests in 
> the streams_application_upgrade_test.py



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to