[
https://issues.apache.org/jira/browse/KAFKA-12857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
dc-heros reassigned KAFKA-12857:
--------------------------------
Assignee: dc-heros
> Using Connect Sink with CooperativeStickyAssignor results in commit offsets
> failure
> -----------------------------------------------------------------------------------
>
> Key: KAFKA-12857
> URL: https://issues.apache.org/jira/browse/KAFKA-12857
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 2.7.1
> Environment: Linux
> Reporter: Oliver Hsu
> Assignee: dc-heros
> Priority: Major
>
> We are attempting to use a Kafka Connect Sink Connector with
> {{CooperativeStickyAssignor}} assignment strategy. When we use
> {{CooperativeStickyAssignor}} offset commits sometimes fail with
> {{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7}
> Ignoring invalid task provided offset
> mytopic-0/OffsetAndMetadata\{offset=16305575, leaderEpoch=null, metadata=''}
> – partition not assigned, assignment=[mytopic-0]
> (org.apache.kafka.connect.runtime.WorkerSinkTask:434)}}
> Note that the invalid partition in the warning message matches the partition
> assignment.
> *Config changes*
> {{consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor}}
> *Cooperative vs Eager Assignment Strategy background*
>
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceListenerandConsumerPartitionAssignorSemantics]
> With eager assignment:
> {quote}Listener#onPartitionsAssigned: called on the full set of assigned
> partitions (may have overlap with the partitions passed to
> #onPartitionsRevoked
> {quote}
> With cooperative assignment:
> {quote}Listener#onPartitionsAssigned: called on the subset of assigned
> partitions that were not previously owned before this rebalance. There should
> be no overlap with the revoked partitions (if any). This will always be
> called, even if there are no new partitions being assigned to a given member.
> {quote}
> This means with cooperative assignment, `onPartitionsAssigned` may be called
> with a partial assignment or an empty collection.
> However, the
> [WorkerSinkTask.HandleRebalance|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L680]
> class makes the assumption that `onPartitionsAssigned` is called with the
> full set of assigned partitions which is true for eager but not coooperative.
> {code:java|title=WorkerSinkTask.HandleRebalance.java|borderStyle=solid}
> public void onPartitionsAssigned(Collection<TopicPartition>
> partitions) {
> log.debug("{} Partitions assigned {}", WorkerSinkTask.this,
> partitions);
> lastCommittedOffsets = new HashMap<>();
> currentOffsets = new HashMap<>();
> for (TopicPartition tp : partitions) {
> long pos = consumer.position(tp);
> lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
> currentOffsets.put(tp, new OffsetAndMetadata(pos));
> log.debug("{} Assigned topic partition {} with offset {}",
> WorkerSinkTask.this, tp, pos);
> }
> {code}
> The {{onPartitionsAssigned}} creates a new empty {{HashMap}} and puts the
> offsets of the {{partitions}} in that {{HashMap}}.
> In the logs we see
> {{[2021-05-26 22:02:09,785] DEBUG WorkerSinkTask\{id=sink-connector-7}
> Partitions assigned [myTopic-0]
> (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
> {{[2021-05-26 22:02:13,063] DEBUG WorkerSinkTask\{id=sink-connector-7}
> Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
> {{[2021-05-26 22:02:16,074] DEBUG WorkerSinkTask\{id=sink-connector-7} }}
> Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
> These logs show that the {{CooperativeStickyAssignor}} calls
> {{onPartitionsAssigned}} first with the partition assigned to it followed by
> additional calls with an empty {{partitions}} collection.
> When {{HandleRebalance.onPartitionsAssigned}} is called first with the
> assigned partition followed by empty collections, the result will be
> {{lastCommittedOffsets}} initialized to an empty {{HashMap}}.
> Inside
> [WorkerSinkTask.commitOffsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L415-L419],
> the current {{committableOffsets}} are based on the
> {{lastCommittedOffsets}}, which is an empty {{HashMap}}:
> {code:java|title=WorkerSinkTask.java|borderStyle=solid}
> private void commitOffsets(long now, boolean closing) {
> ...
> final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new
> HashMap<>(lastCommittedOffsets);
> for (Map.Entry<TopicPartition, OffsetAndMetadata>
> taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
> final TopicPartition partition = taskProvidedOffsetEntry.getKey();
> final OffsetAndMetadata taskProvidedOffset =
> taskProvidedOffsetEntry.getValue();
> if (commitableOffsets.containsKey(partition)) {
> long taskOffset = taskProvidedOffset.offset();
> long currentOffset = currentOffsets.get(partition).offset();
> if (taskOffset <= currentOffset) {
> commitableOffsets.put(partition, taskProvidedOffset);
> } else {
> log.warn("{} Ignoring invalid task provided offset {}/{}
> -- not yet consumed, taskOffset={} currentOffset={}",
> this, partition, taskProvidedOffset, taskOffset,
> currentOffset);
> }
> } else {
> log.warn("{} Ignoring invalid task provided offset {}/{} --
> partition not assigned, assignment={}",
> this, partition, taskProvidedOffset,
> consumer.assignment());
> }
> }
> {code}
> {{if (commitableOffsets.containsKey(partition))}} is {{false}} because
> {{commitableOffsets} is an empty HashMap}} based on the empty
> {{lastCommittedOffsets HashMap}} . This causes {{{} Ignoring invalid task
> provided offset {}/{} – partition not assigned, assignment=}} to occur even
> though the task is assigned the partition.
> e.g.
> {{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7}
> Ignoring invalid task provided offset
> mytopic-0/OffsetAndMetadata\{offset=16305575, leaderEpoch=null, metadata=''}
> – partition not assigned, assignment=[mytopic-0]
> (org.apache.kafka.connect.runtime.WorkerSinkTask:434)}}
> *Recommended Changes*
> {{WorkerSinkTask.HandleRebalance.onPartitionsAssigned}} needs to handle the
> new cooperative assignment strategy which may call {{onPartitionsAssigned}}
> with a subset of assigned partitions or an empty collection if no new
> partitions being assigned to a given member when initializing
> {{lastCommitedOffsets}}.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)