[ 
https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari updated KAFKA-13601:
--------------------------------
    Description: 
Exactly once in s3 connector with scheduled rotation and field partitioner can 
be achieved with consumer offset sync' commit after message batch flushed to 
sink successfully

Currently, WorkerSinkTask committing the consumer offsets asynchronously. 

[https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203]

[https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196]

[https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354]

 

Add config to allow user to select synchronous commit over 
WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG  

  was:
Exactly once in s3 connector with scheduled rotation and field partitioner can 
be achieved with consumer offset sync' commit. 

Currently, WorkerSinkTask committing the consumer offsets asynchronously. 
private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean 
closing, final int seqno) \{
        log.info("{} Committing offsets", this);
        if (closing) \{
            doCommitSync(offsets, seqno);
        } else \{
            OffsetCommitCallback cb = new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception error) {
                    lastCommittedOffsets = offsets;
                    onCommitCompleted(error, seqno);
                }
            };
            consumer.commitAsync(offsets, cb);
        }
    }
Add config to sink to chose sync' offset commit


> Add option to support sync offset commit in Kafka Connect Sink
> --------------------------------------------------------------
>
>                 Key: KAFKA-13601
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13601
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Anil Dasari
>            Priority: Major
>
> Exactly once in s3 connector with scheduled rotation and field partitioner 
> can be achieved with consumer offset sync' commit after message batch flushed 
> to sink successfully
> Currently, WorkerSinkTask committing the consumer offsets asynchronously. 
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203]
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196]
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354]
>  
> Add config to allow user to select synchronous commit over 
> WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to