Hi Chris,

I really don't understand why a graceful shutdown will happen during a
commit operation? Am I understanding something wrong here?. I see
this happens when I have a batch of 2 valid records and in the second batch
the record is invalid. In that case I want to commit the valid records. So
I called commit and sent an empty list for the current batch to poll() and
then when the next file comes in and poll sees new records, I see
InvalidProducerEpochException.
Please advise me.

Thanks,
Nitty

On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY <nittybe...@gmail.com> wrote:

> Hi Chris,
>
> The difference is in the Task Classes, no difference for value/key
> convertors.
>
> I don’t see log messages for graceful shutdown. I am not clear on what you
> mean by shutting down the task.
>
> I called the commit operation for the successful records. Should I perform
> any other steps if I have an invalid record?
> Please advise.
>
> Thanks,
> Nitty
>
> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
>> Hi Nitty,
>>
>> Thanks again for all the details here, especially the log messages.
>>
>> > The below mentioned issue is happening for Json connector only. Is there
>> any difference with asn1,binary,csv and json connector?
>>
>> Can you clarify if the difference here is in the Connector/Task classens,
>> or if it's in the key/value converters that are configured for the
>> connector? The key/value converters are configured using the
>> "key.converter" and "value.converter" property and, if problems arise with
>> them, the task will fail and, if it has a non-empty ongoing transaction,
>> that transaction will be automatically aborted since we close the task's
>> Kafka producer when it fails (or shuts down gracefully).
>>
>> With regards to these log messages:
>>
>> > org.apache.kafka.common.errors.ProducerFencedException: There is a newer
>> producer with the same transactionalId which fences the current one.
>>
>> It looks like your tasks aren't shutting down gracefully in time, which
>> causes them to be fenced out by the Connect framework later on. Do you see
>> messages like "Graceful stop of task <TASK ID HERE> failed" in the logs
>> for
>> your Connect worker?
>>
>> Cheers,
>>
>> Chris
>>
>> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY <nittybe...@gmail.com>
>> wrote:
>>
>> > Hi Chris,
>> >
>> > As you said, the below message is coming when I call an abort if there
>> is
>> > an invalid record, then for the next transaction I can see the below
>> > message and then the connector will be stopped.
>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
>> Aborting
>> > transaction for batch as requested by connector
>> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
>> > [task-thread-json-sftp-source-connector-0]
>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
>> [Producer
>> > clientId=connector-producer-json-sftp-source-connector-0,
>> > transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
>> > incomplete transaction (org.apache.kafka.clients.producer.KafkaProducer)
>> > [task-thread-json-sftp-source-connector-0]
>> >
>> > The issue with InvalidProducerEpoch is happening when I call the commit
>> if
>> > there is an invalid record, and in the next transaction I am getting
>> > InvalidProducerEpoch Exception and the messages are copied in the
>> previous
>> > email. I don't know if this will also be fixed by your bug fix.I am
>> using
>> > kafka 3.3.1 version as of now.
>> >
>> > Thanks,
>> > Nitty
>> >
>> >
>> > On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY <nittybe...@gmail.com>
>> wrote:
>> >
>> > > Hi Chris,
>> > >
>> > > The below mentioned issue is happening for Json connector only. Is
>> there
>> > > any difference with asn1,binary,csv and json connector?
>> > >
>> > > Thanks,
>> > > Nitty
>> > >
>> > > On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY <nittybe...@gmail.com>
>> > wrote:
>> > >
>> > >> Hi Chris,
>> > >>
>> > >> Sorry Chris, I am not able to reproduce the above issue.
>> > >>
>> > >> I want to share with you one more use case I found.
>> > >> The use case is in the first batch it returns 2 valid records and
>> then
>> > in
>> > >> the next batch it is an invalid record.Below is the transaction_state
>> > >> topic, when I call a commit while processing an invalid record.
>> > >>
>> > >>
>> >
>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*,
>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
>> > >> txnStartTimestamp=1678620463834,
>> txnLastUpdateTimestamp=1678620463834)
>> > >>
>> > >> then after some time I saw the below states as well,
>> > >>
>> > >>
>> >
>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000,
>> > state=*PrepareAbort*,
>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
>> > >> txnStartTimestamp=1678620463834,
>> txnLastUpdateTimestamp=1678620526119)
>> > >>
>> >
>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000,
>> > state=*CompleteAbort*,
>> > >> pendingState=None, topicPartitions=HashSet(),
>> > >> txnStartTimestamp=1678620463834,
>> txnLastUpdateTimestamp=1678620526121)
>> > >>
>> > >> Later for the next transaction, when it returns the first batch
>> below is
>> > >> the logs I can see.
>> > >>
>> > >>  Transiting to abortable error state due to
>> > >> org.apache.kafka.common.errors.InvalidProducerEpochException:
>> Producer
>> > >> attempted to produce with an old epoch.
>> > >> (org.apache.kafka.clients.producer.internals.TransactionManager)
>> > >> [kafka-producer-network-thread |
>> > >> connector-producer-json-sftp-source-connector-0]
>> > >> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0]
>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed
>> to
>> > send
>> > >> record to streams-input:
>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
>> > >> [kafka-producer-network-thread |
>> > >> connector-producer-json-sftp-source-connector-0]
>> > >> org.apache.kafka.common.errors.InvalidProducerEpochException:
>> Producer
>> > >> attempted to produce with an old epoch.
>> > >> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0]
>> > >> [Producer clientId=connector-producer-json-sftp-source-connector-0,
>> > >> transactionalId=connect-cluster-json-sftp-source-connector-0]
>> > Transiting to
>> > >> fatal error state due to
>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>> newer
>> > >> producer with the same transactionalId which fences the current one.
>> > >> (org.apache.kafka.clients.producer.internals.TransactionManager)
>> > >> [kafka-producer-network-thread |
>> > >> connector-producer-json-sftp-source-connector-0]
>> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
>> > >> [Producer clientId=connector-producer-json-sftp-source-connector-0,
>> > >> transactionalId=connect-cluster-json-sftp-source-connector-0]
>> Aborting
>> > >> producer batches due to fatal error
>> > >> (org.apache.kafka.clients.producer.internals.Sender)
>> > >> [kafka-producer-network-thread |
>> > >> connector-producer-json-sftp-source-connector-0]
>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>> newer
>> > >> producer with the same transactionalId which fences the current one.
>> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed
>> to
>> > >> flush offsets to storage:
>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
>> > >> [kafka-producer-network-thread |
>> > >> connector-producer-json-sftp-source-connector-0]
>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>> newer
>> > >> producer with the same transactionalId which fences the current one.
>> > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0]
>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed
>> to
>> > send
>> > >> record to streams-input:
>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
>> > >> [kafka-producer-network-thread |
>> > >> connector-producer-json-sftp-source-connector-0]
>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>> newer
>> > >> producer with the same transactionalId which fences the current one.
>> > >> 2023-03-12 11:32:45,222 ERROR
>> > [json-sftp-source-connector|task-0|offsets]
>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed
>> to
>> > >> commit producer transaction
>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
>> > >> [task-thread-json-sftp-source-connector-0]
>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>> newer
>> > >> producer with the same transactionalId which fences the current one.
>> > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0]
>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task
>> threw
>> > an
>> > >> uncaught and unrecoverable exception. Task is being killed and will
>> not
>> > >> recover until manually restarted
>> > >> (org.apache.kafka.connect.runtime.WorkerTask)
>> > >> [task-thread-json-sftp-source-connector-0]
>> > >>
>> > >> Do you know why it is showing an abort state even if I call commit?
>> > >>
>> > >> I tested one more scenario, When I call the commit I saw the below
>> > >>
>> >
>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*,
>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
>> > >> txnStartTimestamp=1678620463834,
>> txnLastUpdateTimestamp=1678620463834)
>> > >> Then, before changing the states to Abort, I dropped the next file
>> then
>> > I
>> > >> dont see any issues. Previous transaction
>> > >> as well as the current transaction are committed.
>> > >>
>> > >> Thank you for your support.
>> > >>
>> > >> Thanks,
>> > >> Nitty
>> > >>
>> > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton
>> <chr...@aiven.io.invalid>
>> > >> wrote:
>> > >>
>> > >>> Hi Nitty,
>> > >>>
>> > >>> > I called commitTransaction when I reach the first error record,
>> but
>> > >>> commit is not happening for me. Kafka connect tries to abort the
>> > >>> transaction automatically
>> > >>>
>> > >>> This is really interesting--are you certain that your task never
>> > invoked
>> > >>> TransactionContext::abortTransaction in this case? I'm looking over
>> the
>> > >>> code base and it seems fairly clear that the only thing that could
>> > >>> trigger
>> > >>> a call to KafkaProducer::abortTransaction is a request by the task
>> to
>> > >>> abort
>> > >>> a transaction (either for a next batch, or for a specific record).
>> It
>> > may
>> > >>> help to run the connector in a debugger and/or look for "Aborting
>> > >>> transaction for batch as requested by connector" or "Aborting
>> > transaction
>> > >>> for record on topic <TOPIC NAME HERE> as requested by connector" log
>> > >>> messages (which will be emitted at INFO level by
>> > >>> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask
>> class
>> > if
>> > >>> the task is requesting an abort).
>> > >>>
>> > >>> Regardless, I'll work on a fix for the bug with aborting empty
>> > >>> transactions. Thanks for helping uncover that one!
>> > >>>
>> > >>> Cheers,
>> > >>>
>> > >>> Chris
>> > >>>
>> > >>> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY <nittybe...@gmail.com>
>> > wrote:
>> > >>>
>> > >>> > Hi Chris,
>> > >>> >
>> > >>> > We have a use case to commit previous successful records and stop
>> the
>> > >>> > processing of the current file and move on with the next file. To
>> > >>> achieve
>> > >>> > that I called commitTransaction when I reach the first error
>> record,
>> > >>> but
>> > >>> > commit is not happening for me. Kafka connect tries to abort the
>> > >>> > transaction automatically, I checked the _transaction_state topic
>> and
>> > >>> > states marked as PrepareAbort and CompleteAbort. Do you know why
>> > kafka
>> > >>> > connect automatically invokes abort instead of the implicit
>> commit I
>> > >>> > called?
>> > >>> > Then as a result, when I tries to parse the next file - say ABC, I
>> > saw
>> > >>> the
>> > >>> > logs "Aborting incomplete transaction" and ERROR: "Failed to sent
>> > >>> record to
>> > >>> > topic", and we lost the first batch of records from the current
>> > >>> transaction
>> > >>> > in the file ABC.
>> > >>> >
>> > >>> > Is it possible that there's a case where an abort is being
>> requested
>> > >>> while
>> > >>> > the current transaction is empty (i.e., the task hasn't returned
>> any
>> > >>> > records from SourceTask::poll since the last transaction was
>> > >>> > committed/aborted)? --- Yes, that case is possible for us. There
>> is a
>> > >>> case
>> > >>> > where the first record itself an error record.
>> > >>> >
>> > >>> > Thanks,
>> > >>> > Nitty
>> > >>> >
>> > >>> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton
>> <chr...@aiven.io.invalid
>> > >
>> > >>> > wrote:
>> > >>> >
>> > >>> > > Hi Nitty,
>> > >>> > >
>> > >>> > > Thanks for the code examples and the detailed explanations,
>> this is
>> > >>> > really
>> > >>> > > helpful!
>> > >>> > >
>> > >>> > > > Say if I have a file with 5 records and batch size is 2, and
>> in
>> > my
>> > >>> 3rd
>> > >>> > > batch I have one error record then in that batch, I dont have a
>> > valid
>> > >>> > > record to call commit or abort. But I want to commit all the
>> > previous
>> > >>> > > batches that were successfully parsed. How do I do that?
>> > >>> > >
>> > >>> > > An important thing to keep in mind with the TransactionContext
>> API
>> > is
>> > >>> > that
>> > >>> > > all records that a task returns from SourceTask::poll are
>> > implicitly
>> > >>> > > included in a transaction. Invoking
>> > >>> SourceTaskContext::transactionContext
>> > >>> > > doesn't alter this or cause transactions to start being used;
>> > >>> everything
>> > >>> > is
>> > >>> > > already in a transaction, and the Connect runtime automatically
>> > >>> begins
>> > >>> > > transactions for any records it sees from the task if it hasn't
>> > >>> already
>> > >>> > > begun one. It's also valid to return a null or empty list of
>> > records
>> > >>> from
>> > >>> > > SourceTask::poll. So in this case, you can invoke
>> > >>> > > transactionContext.commitTransaction() (the no-args variant) and
>> > >>> return
>> > >>> > an
>> > >>> > > empty batch from SourceTask::poll, which will cause the
>> transaction
>> > >>> > > containing the 4 valid records that were returned in the last 2
>> > >>> batches
>> > >>> > to
>> > >>> > > be committed.
>> > >>> > >
>> > >>> > > FWIW, I would be a little cautious about this approach. Many
>> times
>> > >>> it's
>> > >>> > > better to fail fast on invalid data; it might be worth it to at
>> > least
>> > >>> > allow
>> > >>> > > users to configure whether the connector fails on invalid data,
>> or
>> > >>> > silently
>> > >>> > > skips over it (which is what happens when transactions are
>> > aborted).
>> > >>> > >
>> > >>> > > > Why is abort not working without adding the last record to the
>> > >>> list?
>> > >>> > >
>> > >>> > > Is it possible that there's a case where an abort is being
>> > requested
>> > >>> > while
>> > >>> > > the current transaction is empty (i.e., the task hasn't returned
>> > any
>> > >>> > > records from SourceTask::poll since the last transaction was
>> > >>> > > committed/aborted)? I think this may be a bug in the Connect
>> > >>> framework
>> > >>> > > where we don't check to see if a transaction is already open
>> when a
>> > >>> task
>> > >>> > > requests that a transaction be aborted, which can cause tasks to
>> > fail
>> > >>> > (see
>> > >>> > > https://issues.apache.org/jira/browse/KAFKA-14799 for more
>> > details).
>> > >>> > >
>> > >>> > > Cheers,
>> > >>> > >
>> > >>> > > Chris
>> > >>> > >
>> > >>> > >
>> > >>> > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY <
>> nittybe...@gmail.com>
>> > >>> wrote:
>> > >>> > >
>> > >>> > > > Hi Chris,
>> > >>> > > >
>> > >>> > > > I am not sure if you are able to see the images I shared with
>> > you .
>> > >>> > > > Copying the code snippet below,
>> > >>> > > >
>> > >>> > > >  if (expectedRecordCount >= 0) {
>> > >>> > > >             int missingCount = expectedRecordCount - (int)
>> this.
>> > >>> > > > recordOffset() - 1;
>> > >>> > > >             if (missingCount > 0) {
>> > >>> > > >               if (transactionContext != null) {
>> > >>> > > >                 isMissedRecords = true;
>> > >>> > > >               } else {
>> > >>> > > >                 throw new
>> DataException(String.format("Missing %d
>> > >>> > records
>> > >>> > > > (expecting %d, actual %d)", missingCount, expectedRecordCount,
>> > >>> this.
>> > >>> > > > recordOffset()));
>> > >>> > > >               }
>> > >>> > > >             } else if (missingCount < 0) {
>> > >>> > > >               if (transactionContext != null) {
>> > >>> > > >                 isMissedRecords = true;
>> > >>> > > >               } else {
>> > >>> > > >                 throw new DataException(String.format("Too
>> many
>> > >>> records
>> > >>> > > > (expecting %d, actual %d)", expectedRecordCount,
>> > >>> this.recordOffset()));
>> > >>> > > >               }
>> > >>> > > >             }
>> > >>> > > >           }
>> > >>> > > >           addLastRecord(records, null, value);
>> > >>> > > >         }
>> > >>> > > >
>> > >>> > > >
>> > >>> > > >
>> > >>> > > >  //asn1 or binary abort
>> > >>> > > >         if((config.parseErrorThreshold != null &&
>> parseErrorCount
>> > >>> >=
>> > >>> > > > config.parseErrorThreshold
>> > >>> > > >         && lastbatch && transactionContext != null) ||
>> > >>> (isMissedRecords
>> > >>> > > > && transactionContext != null && lastbatch)) {
>> > >>> > > >           log.info("Transaction is aborting");
>> > >>> > > >             log.info("records = {}", records);
>> > >>> > > >             if (!records.isEmpty()) {
>> > >>> > > >               log.info("with record");
>> > >>> > > >
>> > >>> > >  transactionContext.abortTransaction(records.get(records.size
>> > >>> > > > ()-1));
>> > >>> > > >             } else {
>> > >>> > > >               log.info("without record");
>> > >>> > > >               transactionContext.abortTransaction();
>> > >>> > > >             }
>> > >>> > > >
>> > >>> > > > Thanks,
>> > >>> > > > Nitty
>> > >>> > > >
>> > >>> > > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY <
>> > nittybe...@gmail.com>
>> > >>> > > wrote:
>> > >>> > > >
>> > >>> > > >> Hi Chris,
>> > >>> > > >> Sorry for the typo in my previous email.
>> > >>> > > >>
>> > >>> > > >> Regarding the point 2,* the task returns a batch of records
>> from
>> > >>> > > >> SourceTask::poll (and, if using*
>> > >>> > > >>
>> > >>> > > >>
>> > >>> > > >> *the per-record API provided by the TransactionContext class,
>> > >>> includes
>> > >>> > > >> atleast one record that should trigger a transaction
>> > commit/abort
>> > >>> in
>> > >>> > > >> thatbatch)*
>> > >>> > > >> What if I am using the API without passing a record? We have
>> 2
>> > >>> types
>> > >>> > of
>> > >>> > > >> use cases, one where on encountering an error record, we
>> want to
>> > >>> > commit
>> > >>> > > >> previous successful batches and disregard the failed record
>> and
>> > >>> > upcoming
>> > >>> > > >> batches. In this case we created the transactionContext just
>> > >>> before
>> > >>> > > reading
>> > >>> > > >> the file (file is our transaction boundary).Say if I have a
>> file
>> > >>> with
>> > >>> > 5
>> > >>> > > >> records and batch size is 2, and in my 3rd batch I have one
>> > error
>> > >>> > record
>> > >>> > > >> then in that batch, I dont have a valid record to call
>> commit or
>> > >>> > abort.
>> > >>> > > But
>> > >>> > > >> I want to commit all the previous batches that were
>> successfully
>> > >>> > parsed.
>> > >>> > > >> How do I do that?
>> > >>> > > >>
>> > >>> > > >> Second use case is where I want to abort a transaction if the
>> > >>> record
>> > >>> > > >> count doesn't match.
>> > >>> > > >> Code Snippet :
>> > >>> > > >> [image: image.png]
>> > >>> > > >> There are no error records in this case. If you see I added
>> the
>> > >>> > > condition
>> > >>> > > >> of transactionContext check to implement exactly once,
>> without
>> > >>> > > >> transaction it was just throwing the exception without
>> calling
>> > the
>> > >>> > > >> addLastRecord() method and in the catch block it just logs
>> the
>> > >>> message
>> > >>> > > and
>> > >>> > > >> return the list of records without the last record to
>> poll().To
>> > >>> make
>> > >>> > it
>> > >>> > > >> work, I called the method addLastRecord() in this case, so
>> it is
>> > >>> not
>> > >>> > > >> throwing the exception and list has last record as well.
>> Then I
>> > >>> called
>> > >>> > > the
>> > >>> > > >> abort, everything got aborted. Why is abort not working
>> without
>> > >>> adding
>> > >>> > > the
>> > >>> > > >> last record to the list?
>> > >>> > > >> [image: image.png]
>> > >>> > > >>
>> > >>> > > >> Code to call abort.
>> > >>> > > >>
>> > >>> > > >>
>> > >>> > > >>
>> > >>> > > >>
>> > >>> > > >> Thanks,
>> > >>> > > >> Nitty
>> > >>> > > >>
>> > >>> > > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton
>> > >>> <chr...@aiven.io.invalid
>> > >>> > >
>> > >>> > > >> wrote:
>> > >>> > > >>
>> > >>> > > >>> Hi Nitty,
>> > >>> > > >>>
>> > >>> > > >>> I'm a little confused about what you mean by this part:
>> > >>> > > >>>
>> > >>> > > >>> > transaction is not getting completed because it is not
>> > >>> commiting
>> > >>> > the
>> > >>> > > >>> transaction offest.
>> > >>> > > >>>
>> > >>> > > >>> The only conditions required for a transaction to be
>> completed
>> > >>> when a
>> > >>> > > >>> connector is defining its own transaction boundaries are:
>> > >>> > > >>>
>> > >>> > > >>> 1. The task requests a transaction commit/abort from the
>> > >>> > > >>> TransactionContext
>> > >>> > > >>> 2. The task returns a batch of records from SourceTask::poll
>> > >>> (and, if
>> > >>> > > >>> using
>> > >>> > > >>> the per-record API provided by the TransactionContext class,
>> > >>> includes
>> > >>> > > at
>> > >>> > > >>> least one record that should trigger a transaction
>> commit/abort
>> > >>> in
>> > >>> > that
>> > >>> > > >>> batch)
>> > >>> > > >>>
>> > >>> > > >>> The Connect runtime should automatically commit source
>> offsets
>> > to
>> > >>> > Kafka
>> > >>> > > >>> whenever a transaction is completed, either by commit or
>> abort.
>> > >>> This
>> > >>> > is
>> > >>> > > >>> because transactions should only be aborted for data that
>> > should
>> > >>> > never
>> > >>> > > be
>> > >>> > > >>> re-read by the connector; if there is a validation error
>> that
>> > >>> should
>> > >>> > be
>> > >>> > > >>> handled by reconfiguring the connector, then the task should
>> > >>> throw an
>> > >>> > > >>> exception instead of aborting the transaction.
>> > >>> > > >>>
>> > >>> > > >>> If possible, do you think you could provide a brief code
>> > snippet
>> > >>> > > >>> illustrating what your task is doing that's causing issues?
>> > >>> > > >>>
>> > >>> > > >>> Cheers,
>> > >>> > > >>>
>> > >>> > > >>> Chris (not Chrise 🙂)
>> > >>> > > >>>
>> > >>> > > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <
>> > >>> nittybe...@gmail.com>
>> > >>> > > >>> wrote:
>> > >>> > > >>>
>> > >>> > > >>> > Hi Chrise,
>> > >>> > > >>> >
>> > >>> > > >>> > Thanks for sharing the details.
>> > >>> > > >>> >
>> > >>> > > >>> > Regarding the use case, For Asn1 source connector we have
>> a
>> > use
>> > >>> > case
>> > >>> > > to
>> > >>> > > >>> > validate number of records in the file with the number of
>> > >>> records
>> > >>> > in
>> > >>> > > >>> the
>> > >>> > > >>> > header. So currently, if validation fails we are not
>> sending
>> > >>> the
>> > >>> > last
>> > >>> > > >>> > record to the topic. But after introducing exactly once
>> with
>> > >>> > > connector
>> > >>> > > >>> > transaction boundary, I can see that if I call an abort
>> when
>> > >>> the
>> > >>> > > >>> validation
>> > >>> > > >>> > fails, transaction is not getting completed because it is
>> not
>> > >>> > > >>> commiting the
>> > >>> > > >>> > transaction offest. I saw that transaction state changed
>> to
>> > >>> > > >>> CompleteAbort.
>> > >>> > > >>> > So for my next transaction I am getting
>> > >>> > InvalidProducerEpochException
>> > >>> > > >>> and
>> > >>> > > >>> > then task stopped after that. I tried calling the abort
>> after
>> > >>> > sending
>> > >>> > > >>> last
>> > >>> > > >>> > record to the topic then transaction getting completed.
>> > >>> > > >>> >
>> > >>> > > >>> > I dont know if I am doing anything wrong here.
>> > >>> > > >>> >
>> > >>> > > >>> > Please advise.
>> > >>> > > >>> > Thanks,
>> > >>> > > >>> > Nitty
>> > >>> > > >>> >
>> > >>> > > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton
>> > >>> > > <chr...@aiven.io.invalid
>> > >>> > > >>> >
>> > >>> > > >>> > wrote:
>> > >>> > > >>> >
>> > >>> > > >>> > > Hi Nitty,
>> > >>> > > >>> > >
>> > >>> > > >>> > > We've recently added some documentation on implementing
>> > >>> > > exactly-once
>> > >>> > > >>> > source
>> > >>> > > >>> > > connectors here:
>> > >>> > > >>> > >
>> > >>> > > >>> >
>> > >>> > > >>>
>> > >>> > >
>> > >>> >
>> > >>>
>> >
>> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
>> > >>> > > >>> > > .
>> > >>> > > >>> > > To quote a relevant passage from those docs:
>> > >>> > > >>> > >
>> > >>> > > >>> > > > In order for a source connector to take advantage of
>> this
>> > >>> > > support,
>> > >>> > > >>> it
>> > >>> > > >>> > > must be able to provide meaningful source offsets for
>> each
>> > >>> record
>> > >>> > > >>> that it
>> > >>> > > >>> > > emits, and resume consumption from the external system
>> at
>> > the
>> > >>> > exact
>> > >>> > > >>> > > position corresponding to any of those offsets without
>> > >>> dropping
>> > >>> > or
>> > >>> > > >>> > > duplicating messages.
>> > >>> > > >>> > >
>> > >>> > > >>> > > So, as long as your source connector is able to use the
>> > Kafka
>> > >>> > > Connect
>> > >>> > > >>> > > framework's offsets API correctly, it shouldn't be
>> > necessary
>> > >>> to
>> > >>> > > make
>> > >>> > > >>> any
>> > >>> > > >>> > > other code changes to the connector.
>> > >>> > > >>> > >
>> > >>> > > >>> > > To enable exactly-once support for source connectors on
>> > your
>> > >>> > > Connect
>> > >>> > > >>> > > cluster, see the docs section here:
>> > >>> > > >>> > >
>> > >>> > https://kafka.apache.org/documentation/#connect_exactlyoncesource
>> > >>> > > >>> > >
>> > >>> > > >>> > > With regard to transactions, a transactional producer is
>> > >>> always
>> > >>> > > >>> created
>> > >>> > > >>> > > automatically for your connector by the Connect runtime
>> > when
>> > >>> > > >>> exactly-once
>> > >>> > > >>> > > support is enabled on the worker. The only reason to set
>> > >>> > > >>> > > "transaction.boundary" to "connector" is if your
>> connector
>> > >>> would
>> > >>> > > >>> like to
>> > >>> > > >>> > > explicitly define its own transaction boundaries. In
>> this
>> > >>> case,
>> > >>> > it
>> > >>> > > >>> sounds
>> > >>> > > >>> > > like may be what you want; I just want to make sure to
>> call
>> > >>> out
>> > >>> > > that
>> > >>> > > >>> in
>> > >>> > > >>> > > either case, you should not be directly instantiating a
>> > >>> producer
>> > >>> > in
>> > >>> > > >>> your
>> > >>> > > >>> > > connector code, but let the Kafka Connect runtime do
>> that
>> > for
>> > >>> > you,
>> > >>> > > >>> and
>> > >>> > > >>> > just
>> > >>> > > >>> > > worry about returning the right records from
>> > SourceTask::poll
>> > >>> > (and
>> > >>> > > >>> > possibly
>> > >>> > > >>> > > defining custom transactions using the
>> TransactionContext
>> > >>> API).
>> > >>> > > >>> > >
>> > >>> > > >>> > > With respect to your question about committing or
>> aborting
>> > in
>> > >>> > > certain
>> > >>> > > >>> > > circumstances, it'd be useful to know more about your
>> use
>> > >>> case,
>> > >>> > > >>> since it
>> > >>> > > >>> > > may not be necessary to define custom transaction
>> > boundaries
>> > >>> in
>> > >>> > > your
>> > >>> > > >>> > > connector at all.
>> > >>> > > >>> > >
>> > >>> > > >>> > > Cheers,
>> > >>> > > >>> > >
>> > >>> > > >>> > > Chris
>> > >>> > > >>> > >
>> > >>> > > >>> > >
>> > >>> > > >>> > >
>> > >>> > > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <
>> > >>> nittybe...@gmail.com
>> > >>> > >
>> > >>> > > >>> wrote:
>> > >>> > > >>> > >
>> > >>> > > >>> > > > Hi Team,
>> > >>> > > >>> > > >
>> > >>> > > >>> > > > Adding on top of this, I tried creating a
>> > >>> TransactionContext
>> > >>> > > >>> object and
>> > >>> > > >>> > > > calling the commitTransaction and abortTranaction
>> methods
>> > >>> in
>> > >>> > > source
>> > >>> > > >>> > > > connectors.
>> > >>> > > >>> > > > But the main problem I saw is that if there is any
>> error
>> > >>> while
>> > >>> > > >>> parsing
>> > >>> > > >>> > > the
>> > >>> > > >>> > > > record, connect is calling an abort but we have a use
>> > case
>> > >>> to
>> > >>> > > call
>> > >>> > > >>> > commit
>> > >>> > > >>> > > > in some cases. Is it a valid use case in terms of
>> kafka
>> > >>> > connect?
>> > >>> > > >>> > > >
>> > >>> > > >>> > > > Another Question - Should I use a transactional
>> producer
>> > >>> > instead
>> > >>> > > >>> > > > creating an object of TransactionContext? Below is the
>> > >>> > connector
>> > >>> > > >>> > > > configuration I am using.
>> > >>> > > >>> > > >
>> > >>> > > >>> > > >   exactly.once.support: "required"
>> > >>> > > >>> > > >   transaction.boundary: "connector"
>> > >>> > > >>> > > >
>> > >>> > > >>> > > > Could you please help me here?
>> > >>> > > >>> > > >
>> > >>> > > >>> > > > Thanks,
>> > >>> > > >>> > > > Nitty
>> > >>> > > >>> > > >
>> > >>> > > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <
>> > >>> > > nittybe...@gmail.com>
>> > >>> > > >>> > > wrote:
>> > >>> > > >>> > > >
>> > >>> > > >>> > > > > Hi Team,
>> > >>> > > >>> > > > > I am trying to implement exactly once behavior in
>> our
>> > >>> source
>> > >>> > > >>> > connector.
>> > >>> > > >>> > > > Is
>> > >>> > > >>> > > > > there any sample source connector implementation
>> > >>> available to
>> > >>> > > >>> have a
>> > >>> > > >>> > > look
>> > >>> > > >>> > > > > at?
>> > >>> > > >>> > > > > Regards,
>> > >>> > > >>> > > > > Nitty
>> > >>> > > >>> > > > >
>> > >>> > > >>> > > >
>> > >>> > > >>> > >
>> > >>> > > >>> >
>> > >>> > > >>>
>> > >>> > > >>
>> > >>> > >
>> > >>> >
>> > >>>
>> > >>
>> >
>>
>

Reply via email to