[ https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Anil Dasari updated KAFKA-13601: -------------------------------- Description: Exactly once in s3 connector with scheduled rotation and field partitioner can be achieved with consumer offset sync' commit. Currently, WorkerSinkTask committing the consumer offsets asynchronously. private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing, final int seqno) \{ log.info("{} Committing offsets", this); if (closing) \{ doCommitSync(offsets, seqno); } else \{ OffsetCommitCallback cb = new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) { lastCommittedOffsets = offsets; onCommitCompleted(error, seqno); } }; consumer.commitAsync(offsets, cb); } } Add config to sink to chose sync' offset commit was: Exactly once in s3 connector with scheduled rotation and field partitioner can be achieved with consumer offset sync' commit. Currently, WorkerSinkTask committing the consumer offsets asynchronously. private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing, final int seqno) \{ log.info("{} Committing offsets", this); if (closing) \{ doCommitSync(offsets, seqno); } else \{ OffsetCommitCallback cb = new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) { lastCommittedOffsets = offsets; onCommitCompleted(error, seqno); } }; consumer.commitAsync(offsets, cb); } } Add config to sink to chose sync' offset commit > Add option to support sync offset commit in Kafka Connect Sink > -------------------------------------------------------------- > > Key: KAFKA-13601 > URL: https://issues.apache.org/jira/browse/KAFKA-13601 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Reporter: Anil Dasari > Priority: Major > > Exactly once in s3 connector with scheduled rotation and field partitioner > can be achieved with consumer offset sync' commit. > Currently, WorkerSinkTask committing the consumer offsets asynchronously. > private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean > closing, final int seqno) \{ > log.info("{} Committing offsets", this); > if (closing) \{ > doCommitSync(offsets, seqno); > } else \{ > OffsetCommitCallback cb = new OffsetCommitCallback() { > @Override > public void onComplete(Map<TopicPartition, OffsetAndMetadata> > offsets, Exception error) { > lastCommittedOffsets = offsets; > onCommitCompleted(error, seqno); > } > }; > consumer.commitAsync(offsets, cb); > } > } > Add config to sink to chose sync' offset commit -- This message was sent by Atlassian Jira (v8.20.1#820001)