[
https://issues.apache.org/jira/browse/KAFKA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yu-Jhe Li updated KAFKA-13404:
------------------------------
Attachment: Main.scala
> Kafka sink connectors do not commit offset correctly if messages are produced
> in transaction
> --------------------------------------------------------------------------------------------
>
> Key: KAFKA-13404
> URL: https://issues.apache.org/jira/browse/KAFKA-13404
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 2.6.1
> Reporter: Yu-Jhe Li
> Priority: Major
> Attachments: Main.scala
>
>
> The Kafka sink connectors don't commit offset to the latest log-end offset if
> the messages are produced in a transaction.
> From the code of [WorkerSinkTask.java|#L467], we found that the sink
> connector gets offset from messages and commits it to Kafka after the
> messages are processed successfully. But for messages produced in the
> transaction, there are additional record [control
> batches|http://kafka.apache.org/documentation/#controlbatch] that are used to
> indicate the transaction is successful or aborted.
>
> You can reproduce it by running `connect-file-sink` with the following
> properties:
> {noformat}
> /opt/kafka/bin/connect-standalone.sh /connect-standalone.properties
> /connect-file-sink.properties{noformat}
> {code:java}
> # connect-standalone.properties
> bootstrap.servers=localhost:9092
> key.converter=org.apache.kafka.connect.storage.StringConverter
> value.converter=org.apache.kafka.connect.storage.StringConverter
> key.converter.schemas.enable=true
> value.converter.schemas.enable=true
> # for testing
> offset.flush.interval.ms=10000
> consumer.isolation.level=read_committed
> consumer.auto.offset.reset=none
> {code}
> {code:java}
> # connect-file-sink.properties
> name=local-file-sink
> connector.class=FileStreamSink
> tasks.max=1
> file=/tmp/test.sink.txt
> topics=test{code}
> And use the attached Java producer to produce 10 messages to the `test` topic
> in a transaction.
> You can see that the topic log-end offset is 11 now and the last record in
> the segment file is control batches. But the consumer group offset is still
> in 10. (If the record is deleted by topic retention, you will get
> OffsetOutOfRange exception after restart the connector)
> {code:java}
> bash-5.1# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server
> kafka1:9092 --group connect-local-file-sink --describe
> GROUP TOPIC PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG CONSUMER-ID
> HOST CLIENT-ID
>
> connect-local-file-sink test 0 10 11
> 1
> connector-consumer-local-file-sink-0-10777adb-72c2-4fd3-8773-4f5a0498903d
> /172.21.0.3 connector-consumer-local-file-sink-0
> bash-5.1# /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments
> --files /kafka/test-0/00000000000000000000.log --print-data-log
> Dumping /kafka/test-0/00000000000000000000.log
> Starting offset: 0
> baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9
> producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional:
> true isControl: false position: 0 CreateTime: 1634805907230 size: 208 magic:
> 2 compresscodec: GZIP crc: 2170304005 isvalid: tru GZIP crc: 2170304005
> isvalid: true
> | offset: 0 CreateTime: 1634805907210 keysize: -1 valuesize: 39 sequence: 0
> headerKeys: [] payload: {"value": "banana", "time": 1634805907}
> | offset: 1 CreateTime: 1634805907230 keysize: -1 valuesize: 39 sequence: 1
> headerKeys: [] payload: {"value": "banana", "time": 1634805907}
> | offset: 2 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 2
> headerKeys: [] payload: {"value": "ice", "time": 1634805907}
> | offset: 3 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 3
> headerKeys: [] payload: {"value": "apple", "time": 1634805907}
> | offset: 4 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 4
> headerKeys: [] payload: {"value": "home", "time": 1634805907}
> | offset: 5 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 5
> headerKeys: [] payload: {"value": "juice", "time": 1634805907}
> | offset: 6 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 6
> headerKeys: [] payload: {"value": "cat", "time": 1634805907}
> | offset: 7 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 7
> headerKeys: [] payload: {"value": "cat", "time": 1634805907}
> | offset: 8 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 8
> headerKeys: [] payload: {"value": "girl", "time": 1634805907}
> | offset: 9 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 9
> headerKeys: [] payload: {"value": "cat", "time": 1634805907}
> baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1
> producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional:
> true isControl: true position: 208 CreateTime: 1634805908149 size: 78 magic:
> 2 compresscodec: NONE crc: 1662003889 isvalid: ec: NONE crc: 1662003889
> isvalid: true
> | offset: 10 CreateTime: 1634805908149 keysize: 4 valuesize: 6 sequence: -1
> headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
> {code}
>
> I think we should use
> [KafkaConsumer.position()|https://github.com/apache/kafka/blob/2.6.1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1742]
> to get the correct offset instead of offsets in messages.
> I will create a PR for that later.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)