[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304452#comment-17304452
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12463:
------------------------------------------------

Out of curiosity, why use the RoundRobinAssignor rather than the StickyAssignor 
if you want to avoid cooperative? I'm not a Connect expert so maybe there's 
some nuance here, but the StickyAssignor is generally preferred (we were 
planning to make that the default in 3.0 before the CooperativeStickyAssignor 
came along)

> Update default consumer partition assignor for sink tasks
> ---------------------------------------------------------
>
>                 Key: KAFKA-12463
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12463
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: Chris Egerton
>            Assignee: Chris Egerton
>            Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below 
> will not work as consumers will still perform eager rebalancing as long as at 
> least one of the partition assignors they are configured with does not 
> support cooperative rebalancing. KAFKA-12487 should also be addressed before 
> configuring any Connect worker to use the {{CooperativeStickyAssignor}} for 
> any sink connectors.*
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by changing the {{Worker}} to set the following on the 
> consumer configuration created for each sink connector task:
> {code:java}
> partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
> {code}
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically. But, this setting will 
> be overwritten by any user-specified 
> {{consumer.partition.assignment.strategy}} property in the worker 
> configuration, and by any user-specified 
> {{consumer.override.partition.assignment.strategy}} property in a sink 
> connector configuration when per-connector client overrides is enabled in the 
> worker config with {{connector.client.config.override.policy=ALL}}.
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release.
> h3. Manually setting the partition assignment strategy
> There is a simple workaround to achieve the same behavior in AK releases 2.4 
> and later that don't also include this improvement: either set a value for 
> the {{consumer.partition.assignment.strategy}} property in the *worker 
> configuration, or* set a value for the 
> {{consumer.override.partition.assignment.strategy}} property in one or more 
> *connector configurations* when per-connector client overrides is enabled in 
> the worker config with {{connector.client.config.override.policy=ALL}}.
> In order to avoid task failures while the connector is being reconfigured, it 
> is highly recommended that the consumer be configured with a list of both the 
> new and the current partition assignment strategies, instead of just the new 
> partition assignment strategy. For example, to update a connector configured 
> to use the {{RangeAssignor}} strategy to use the {{RoundRobinAssignor}} 
> strategy instead, add the following to the connector configuration:
> {code:java}
> "consumer.override.partition.assignment.strategy": 
> "org.apache.kafka.clients.consumer.RoundRobinAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor"{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to