[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Egerton updated KAFKA-12463: ---------------------------------- Labels: needs-kip (was: ) > Update default consumer partition assignor for sink tasks > --------------------------------------------------------- > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Reporter: Chris Egerton > Assignee: Chris Egerton > Priority: Major > Labels: needs-kip > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > h3. Proposed Change > *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below > will not work as consumers will still perform eager rebalancing as long as at > least one of the partition assignors they are configured with does not > support cooperative rebalancing. KAFKA-12487 should also be addressed before > configuring any Connect worker to use the {{CooperativeStickyAssignor}} for > any sink connectors.* > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." As Connect and > the tooling around it matures and automatic restarts of failed tasks become > more popular, care should be taken to ensure that the consumer group churn > created by restarting one or more tasks doesn't compromise the availability > of other tasks by forcing them to temporarily yield up all of their > partitions just to reclaim them after a rebalance has completed. > With that in mind, we should alter the default consumer configuration for > sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this > in a backwards-compatible fashion that also enables rolling upgrades, this > should be implemented by changing the {{Worker}} to set the following on the > consumer configuration created for each sink connector task: > {code:java} > partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor > {code} > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > Importantly, this setting will only be a default, and any user-specified > overrides either in the *worker config*: > > {code:java} > consumer.partition.assignment.strategy=<user-specified strategy>{code} > > or in the *connector config*: > > {code:java} > "consumer.override.partition.assignment.strategy": "<user-specified > strategy>"{code} > > will still be respected. > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. > h3. Workaround: manually setting the partition assignment strategy > There is a simple workaround to achieve the same behavior in releases 2.4 and > later that don't include this improvement: manually override either a > connector configuration or an entire worker configuration. > In order to avoid task failures while the connector is being reconfigured, it > is highly recommended that the consumer be configured with a list of both the > new and the current partition assignment strategies, instead of just the new > partition assignment strategy. > > For example, to update a connector formerly configured to use the > {{RangeAssignor}} strategy to instead use the {{RoundRobinAssignor}} > strategy, add the following to the connector configuration: > {code:java} > "consumer.override.partition.assignment.strategy": > "org.apache.kafka.clients.consumer.RoundRobinAssignor, > org.apache.kafka.clients.consumer.RangeAssignor"{code} > Note that this will require per-connector client overrides to be enabled on > the worker, which can be accomplished by including this in the worker's > configuration: > {code:java} > connector.client.config.override.policy=ALL > {code} > And to update an entire worker formerly configured to use the > {{RangeAssignor}} strategy to instead use the {{RoundRobinAssignor}} > strategy, add the following to the worker configuration: > {code:java} > consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor, > org.apache.kafka.clients.consumer.RangeAssignor{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)