This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push: new ee4f65d Explain the separate upgrade paths for consumer groups and Streams (#7516) ee4f65d is described below commit ee4f65dfa5bd2a14e17805734be3d9e1fdfc25a2 Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Wed Oct 16 16:09:09 2019 -0700 Explain the separate upgrade paths for consumer groups and Streams (#7516) Document the upgrade path for the consumer and for Streams (note that they differ significantly). Needs to be cherry-picked to 2.4 Reviewers: Guozhang Wang <wangg...@gmail.com> --- docs/streams/upgrade-guide.html | 17 +++++++++++------ docs/upgrade.html | 25 +++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 1cc5b50..2de7917 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -34,10 +34,10 @@ </div> <p> - Upgrading from any older version to {{fullDotVersion}} is possible: (1) if you are upgrading from 2.0.x to {{fullDotVersion}} then a single rolling bounce is needed to swap in the new jar, - (2) if you are upgrading from older versions than 2.0.x in the online mode, you would need two rolling bounces where - the first rolling bounce phase you need to set config <code>upgrade.from="older version"</code> (possible values are <code>"0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", and "1.1"</code>) - (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>): + Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code> + (possible values are <code>"0.10.0" - "2.3"</code>) and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager + rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to + <a href="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>: </p> <ul> <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to the version from which it is being upgrade.</li> @@ -75,11 +75,16 @@ <h3><a id="streams_api_changes_240" href="#streams_api_changes_240">Streams API changes in 2.4.0</a></h3> <!-- Placeholder KIP-213 --> - <!-- Placeholder KIP-307 --> <!-- Placeholder KIP-479 --> - <!-- Placeholder KIP-429 --> + <p> + With the introduction of incremental cooperative rebalancing, Streams no longer requires all tasks be revoked at the beginning of a rebalance. Instead, at the completion of the rebalance only those tasks which are to be migrated to another consumer + for overall load balance will need to be closed and revoked. This changes the semantics of the <code>StateListener</code> a bit, as it will not necessarily transition to <code>REBALANCING</code> at the beginning of a rebalance anymore. Note that + this means IQ will now be available at all times except during state restoration, including while a rebalance is in progress. If restoration is occurring when a rebalance begins, we will continue to actively restore the state stores and/or process + standby tasks during a cooperative rebalance. Note that with this new rebalancing protocol, you may sometimes see a rebalance be followed by a second short rebalance that ensures all tasks are safely distributed. For details on please see + <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol">KIP-429</a>. + </p> <p> The 2.4.0 release contains newly added and reworked metrics. diff --git a/docs/upgrade.html b/docs/upgrade.html index 0c69336..6d9f702 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -45,6 +45,31 @@ The old overloaded functions are deprecated and we would recommend users to make their code changes to leverage the new methods (details can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-520%3A+Add+overloaded+Consumer%23committed+for+batching+partitions">KIP-520</a>). </li> + <li>We are introducing incremental cooperative rebalancing to the clients' group protocol, which allows consumers to keep all of their assigned partitions during a rebalance + and at the end revoke only those which must be migrated to another consumer for overall cluster balance. The <code>ConsumerCoordinator</code> will choose the latest <code>RebalanceProtocol</code> + that is commonly supported by all of the consumer's supported assignors. You can use the new built-in <code>CooperativeStickyAssignor</code> or plug in your own custom cooperative assignor. To do + so you must implement the <code>ConsumerPartitionAssignor</code> interface and include <code>RebalanceProtocol.COOPERATIVE</code> in the list returned by <code>ConsumerPartitionAssignor#supportedProtocols</code>. + Your custom assignor can then leverage the <code>ownedPartitions</code> field in each consumer's <code>Subscription</code> to give partitions back to their previous owners whenever possible. Note that when + a partition is to be reassigned to another consumer, it <em>must</em> be removed from the new assignment until it has been revoked from its original owner. Any consumer that has to revoke a partition will trigger + a followup rebalance to allow the revoked partition to safely be assigned to its new owner. See the + <a href="https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html">ConsumerPartitionAssignor RebalanceProtocol javadocs</a> for more information. + <br> + To upgrade from the old (eager) protocol, which always revokes all partitions before rebalancing, to cooperative rebalancing, you must follow a specific upgrade path to get all clients on the same <code>ConsumerPartitionAssignor</code> + that supports the cooperative protocol. This can be done with two rolling bounces, using the <code>CooperativeStickyAssignor</code> for the example: during the first one, add "cooperative-sticky" to the list of supported assignors + for each member (without removing the previous assignor -- note that if previously using the default, you must include that explicitly as well). You then bounce and/or upgrade it. + Once the entire group is on 2.4+ and all members have the "cooperative-sticky" among their supported assignors, remove the other assignor(s) and perform a second rolling bounce so that by the end all members support only the + cooperative protocol. For further details on the cooperative rebalancing protocol and upgrade path, see <a href="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>. + </li> + <li>There are some behavioral changes to the <code>ConsumerRebalanceListener</code>, as well as a new API. Exceptions thrown during any of the listener's three callbacks will no longer be swallowed, and will instead be re-thrown + all the way up to the <code>Consumer.poll()</code> call. The <code>onPartitionsLost</code> method has been added to allow users to react to abnormal circumstances where a consumer may have lost ownership of its partitions + (such as a missed rebalance) and cannot commit offsets. By default, this will simply call the existing <code>onPartitionsRevoked</code> API to align with previous behavior. Note however that <code>onPartitionsLost</code> will not + be called when the set of lost partitions is empty. This means that no callback will be invoked at the beginning of the first rebalance of a new consumer joining the group. + <br> + The semantics of the <code>ConsumerRebalanceListener's</code> callbacks are further changed when following the cooperative rebalancing protocol described above. In addition to <code>onPartitionsLost</code>, <code>onPartitionsRevoked</code> + will also never be called when the set of revoked partitions is empty. The callback will generally be invoked only at the end of a rebalance, and only on the set of partitions that are being moved to another consumer. The + <code>onPartitionsAssigned</code> callback will however always be called, even with an empty set of partitions, as a way to notify users of a rebalance event (this is true for both cooperative and eager). For details on + the new callback semantics, see the <a href="https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html">ConsumerRebalanceListener javadocs</a>. + </li> </ul> <h4><a id="upgrade_2_3_0" href="#upgrade_2_3_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0</a></h4>