krishnendu Das created KAFKA-14947:
--------------------------------------

             Summary: Duplicate records are getting created in the topic. 
                 Key: KAFKA-14947
                 URL: https://issues.apache.org/jira/browse/KAFKA-14947
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 3.1.1
            Reporter: krishnendu Das


We are using Kafka connect API (version 2.3.0) and Kafka  (version 3.1.1) for 
data ingestion purposes. Previously we were using Kafka (version 2.6.2) and the 
same Kafka connect API (version 2.3.0). The data ingestion was happening 
properly. 

 

Recently we updated the Kafka version from 2.6.2 to 3.1.1.

Post update we are facing duplicate data issues from the source connector into 
the Kafka topic. After debugging the 3.1.1 code, we saw one new function

{*}updateCommittableOffsets{*}() got added and called inside the 
{*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit 
source task offsets without blocking on batch delivery (#11323)"

 

Now because of this function, we are observing this scenario
 # Inside the execute() at the start of the flow, the call goes to 
updateCommittableOffsets() to check if anything was there to perform the 
committed offset or not. As the first poll is still not yet happened, this 
function didn't find anything for commit.
 # Then Kafka connects API poll() method is called from the 
WorkerSourceTask::execute(). *---------> 1st poll*
 # Kafka Connect API (using sleepy policy) reads one source file from the Cloud 
source directory.
 # Read the whole content of the file and send the result set Kafka server to 
write to the Kafka topic.
 # During the 2nd poll updateCommittableOffsets() found some offset to commit 
and its updates a reference variable committableOffsets, which will be used 
further by the WorkerSourceTask::commitOffsets() function to perform actual 
commit offset.
 # Then Kafka connects the API poll() method is called from the 
*WorkerSourceTask::execute().* *---------> 2nd poll*
 # Kafka Connect API (using sleepy policy) reads the same source file again 
from the start, as the offsetStrorageReader::offset() didn’t give the latest 
offset.
 # Read the whole content of the file and send the result set Kafka server to 
write to the Kafka topic.---> These create duplicate data into the topic.
................................................
................................................
 # WorkerSourceTask::commitOffsets() commits the offset.
................................................
................................................
 # Then Kafka connects API poll() method is called from the 
{*}WorkerSourceTask::execute(){*}. ---------> 3rd poll
 # This time offsetStrorageReader::offset() will be able to give the latest 
offset.
 # Kafka Connect API (using sleepy policy) reads the same source file from the 
last read position.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to