[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304248#comment-17304248 ]
Randall Hauch commented on KAFKA-12463: --------------------------------------- Thanks for the response and logging KAFKA-12487, [~ChrisEgerton]. {quote} I'd also just like to point out that the goal here is to improve the out-of-the-box behavior of Connect for users; although workarounds are nice to have, the goal here shouldn't be to focus on documenting them but instead, to make them obsolete. If we decide not to improve the default behavior of Connect then we can document this somewhere else that's a little more visible for users as opposed to developers. {quote} I agree with you that we should fix the behavior. But the fix will appear only in certain releases, and not all users will be able to upgrade to those releases to get the fix. So, documenting the simple workaround will help users in those situations. I suggested documenting the workaround here to help any users that do stumble upon this issue when searching for a potential fix, regardless of whether the workaround is also documented elsewhere. IIUC, given KAFKA-12487 is likely more challenging to fix and has not yet been addressed, the shorter term proposal here is to change the worker to set the following on consumer configs used in sink connectors: {code:java} partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor,org.apache.kafka.clients.consumer.RangeAssignor{code} This works because the round robin will be used only after all workers have been upgraded, and this gives us more balanced consumer assignments. Plus, it is backward compatible since the worker will always override this new value should users have any worker configs that override this property via: {code:java} consumer.partition.assignment.strategy=... {code} or have any connector configs that use client overrides via: {code:java} consumer.overrides.partition.assignment.strategy=...{code} If that is the proposal, WDYT about updating the description to make this more clear? Essentially, I suggest this issue's description would state the problem (that section is good), propose a solution using round robin (mostly using your proposed section to use round robin rather than cooperative), document the workaround, and finally address why {{RoundRobitAssignor}} was used instead of {{CooperativeStickyAssignor}}. And if we're on the same page, then I think it's worth updating the PR to implement the proposed fix. I'll state the same on a review for the PR. > Update default consumer partition assignor for sink tasks > --------------------------------------------------------- > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Reporter: Chris Egerton > Assignee: Chris Egerton > Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > h3. Proposed Change > *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below > will not work as consumers will still perform eager rebalancing as long as at > least one of the partition assignors they are configured with does not > support cooperative rebalancing. KAFKA-12487 should also be addressed before > configuring any Connect worker to use the {{CooperativeStickyAssignor}} for > any sink connectors.* > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by changing the {{Worker}} to set the following on the > consumer configuration created for each sink connector task: > {code:java} > partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor > {code} > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. But, this setting will > be overwritten by any user-specified > {{consumer.partition.assignment.strategy}} property in the worker > configuration, and by any user-specified > {{consumer.override.partition.assignment.strategy}} property in a sink > connector configuration when per-connector client overrides is enabled in the > worker config with {{connector.client.config.override.policy=ALL}}. > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. > h3. Manually setting the partition assignment strategy > There is a simple workaround to achieve the same behavior in AK releases 2.4 > and later that don't also include this improvement: either set a value for > the {{consumer.partition.assignment.strategy}} property in the *worker > configuration, or* set a value for the > {{consumer.override.partition.assignment.strategy}} property in one or more > *connector configurations* when per-connector client overrides is enabled in > the worker config with {{connector.client.config.override.policy=ALL}}. > In order to avoid task failures while the connector is being reconfigured, it > is highly recommended that the consumer be configured with a list of both the > new and the current partition assignment strategies, instead of just the new > partition assignment strategy. For example, to update a connector configured > to use the {{RangeAssignor}} strategy to use the {{RoundRobinAssignor}} > strategy instead, add the following to the connector configuration: > {code:java} > "consumer.override.partition.assignment.strategy": > "org.apache.kafka.clients.consumer.RoundRobinAssignor, > org.apache.kafka.clients.consumer.RangeAssignor"{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)