[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12463:
----------------------------------
    Labels: needs-kip  (was: )

> Update default consumer partition assignor for sink tasks
> ---------------------------------------------------------
>
>                 Key: KAFKA-12463
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12463
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: Chris Egerton
>            Assignee: Chris Egerton
>            Priority: Major
>              Labels: needs-kip
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below 
> will not work as consumers will still perform eager rebalancing as long as at 
> least one of the partition assignors they are configured with does not 
> support cooperative rebalancing. KAFKA-12487 should also be addressed before 
> configuring any Connect worker to use the {{CooperativeStickyAssignor}} for 
> any sink connectors.*
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters." As Connect and 
> the tooling around it matures and automatic restarts of failed tasks become 
> more popular, care should be taken to ensure that the consumer group churn 
> created by restarting one or more tasks doesn't compromise the availability 
> of other tasks by forcing them to temporarily yield up all of their 
> partitions just to reclaim them after a rebalance has completed.
> With that in mind, we should alter the default consumer configuration for 
> sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this 
> in a backwards-compatible fashion that also enables rolling upgrades, this 
> should be implemented by changing the {{Worker}} to set the following on the 
> consumer configuration created for each sink connector task:
> {code:java}
> partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
> {code}
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
> Importantly, this setting will only be a default, and any user-specified 
> overrides either in the *worker config*:
>  
> {code:java}
> consumer.partition.assignment.strategy=<user-specified strategy>{code}
>  
> or in the *connector config*:
>  
> {code:java}
> "consumer.override.partition.assignment.strategy": "<user-specified 
> strategy>"{code}
>  
> will still be respected.
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release.
> h3. Workaround: manually setting the partition assignment strategy
> There is a simple workaround to achieve the same behavior in releases 2.4 and 
> later that don't include this improvement: manually override either a 
> connector configuration or an entire worker configuration.
> In order to avoid task failures while the connector is being reconfigured, it 
> is highly recommended that the consumer be configured with a list of both the 
> new and the current partition assignment strategies, instead of just the new 
> partition assignment strategy.
>  
> For example, to update a connector formerly configured to use the 
> {{RangeAssignor}} strategy to instead use the {{RoundRobinAssignor}} 
> strategy, add the following to the connector configuration:
> {code:java}
> "consumer.override.partition.assignment.strategy": 
> "org.apache.kafka.clients.consumer.RoundRobinAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor"{code}
> Note that this will require per-connector client overrides to be enabled on 
> the worker, which can be accomplished by including this in the worker's 
> configuration:
> {code:java}
> connector.client.config.override.policy=ALL
> {code}
> And to update an entire worker formerly configured to use the 
> {{RangeAssignor}} strategy to instead use the {{RoundRobinAssignor}} 
> strategy, add the following to the worker configuration:
> {code:java}
> consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor,
>  org.apache.kafka.clients.consumer.RangeAssignor{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to