[ 
https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari updated KAFKA-13601:
--------------------------------
    Description: 
Exactly once in s3 connector with scheduled rotation and field partitioner can 
be achieved with consumer offset sync' commit. 

Currently, WorkerSinkTask committing the consumer offsets asynchronously. 
private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean 
closing, final int seqno) \{
        log.info("{} Committing offsets", this);
        if (closing) \{
            doCommitSync(offsets, seqno);
        } else \{
            OffsetCommitCallback cb = new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception error) {
                    lastCommittedOffsets = offsets;
                    onCommitCompleted(error, seqno);
                }
            };
            consumer.commitAsync(offsets, cb);
        }
    }
Add config to sink to chose sync' offset commit

  was:
Exactly once in s3 connector with scheduled rotation and field partitioner can 
be achieved with consumer offset sync' commit. 

Currently, WorkerSinkTask committing the consumer offsets asynchronously. 
private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean 
closing, final int seqno) \{
        log.info("{} Committing offsets", this);
        if (closing) \{
            doCommitSync(offsets, seqno);
        } else \{
            OffsetCommitCallback cb = new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception error) {
                    lastCommittedOffsets = offsets;
                    onCommitCompleted(error, seqno);
                }
            };
            consumer.commitAsync(offsets, cb);
        }
    }
 

Add config to sink to chose sync' offset commit


> Add option to support sync offset commit in Kafka Connect Sink
> --------------------------------------------------------------
>
>                 Key: KAFKA-13601
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13601
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Anil Dasari
>            Priority: Major
>
> Exactly once in s3 connector with scheduled rotation and field partitioner 
> can be achieved with consumer offset sync' commit. 
> Currently, WorkerSinkTask committing the consumer offsets asynchronously. 
> private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean 
> closing, final int seqno) \{
>         log.info("{} Committing offsets", this);
>         if (closing) \{
>             doCommitSync(offsets, seqno);
>         } else \{
>             OffsetCommitCallback cb = new OffsetCommitCallback() {
>                 @Override
>                 public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
> offsets, Exception error) {
>                     lastCommittedOffsets = offsets;
>                     onCommitCompleted(error, seqno);
>                 }
>             };
>             consumer.commitAsync(offsets, cb);
>         }
>     }
> Add config to sink to chose sync' offset commit



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to