[
https://issues.apache.org/jira/browse/KAFKA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yu-Jhe Li updated KAFKA-13404:
------------------------------
Description:
The Kafka sink connectors don't commit offset to the latest log-end offset if
the messages are produced in a transaction.
>From the code of
>[[WorkerSinkTask.java|https://github.com/apache/kafka/blob/2.6.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L477]|#L467],
> we found that the sink connector gets offset from messages and commits it to
>Kafka after the messages are processed successfully. But for messages produced
>in the transaction, there are additional record [control
>batches|http://kafka.apache.org/documentation/#controlbatch] that are used to
>indicate the transaction is successful or aborted.
You can reproduce it by running `connect-file-sink` with the following
properties:
{noformat}
/opt/kafka/bin/connect-standalone.sh /connect-standalone.properties
/connect-file-sink.properties{noformat}
{code:java}
# connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# for testing
offset.flush.interval.ms=10000
consumer.isolation.level=read_committed
consumer.auto.offset.reset=none
{code}
{code:java}
# connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=test{code}
And use the attached Java producer ([^Main.scala] to produce 10 messages to the
`test` topic in a transaction.
You can see that the topic log-end offset is 11 now and the last record in the
segment file is control batches. But the consumer group offset is still in 10.
(If the record is deleted by topic retention, you will get OffsetOutOfRange
exception after restart the connector)
{code:java}
bash-5.1# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server
kafka1:9092 --group connect-local-file-sink --describe
GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET LAG CONSUMER-ID
HOST CLIENT-ID
connect-local-file-sink test 0 10 11
1
connector-consumer-local-file-sink-0-10777adb-72c2-4fd3-8773-4f5a0498903d
/172.21.0.3 connector-consumer-local-file-sink-0
bash-5.1# /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
/kafka/test-0/00000000000000000000.log --print-data-log
Dumping /kafka/test-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9
producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true
isControl: false position: 0 CreateTime: 1634805907230 size: 208 magic: 2
compresscodec: GZIP crc: 2170304005 isvalid: tru GZIP crc: 2170304005 isvalid:
true
| offset: 0 CreateTime: 1634805907210 keysize: -1 valuesize: 39 sequence: 0
headerKeys: [] payload: {"value": "banana", "time": 1634805907}
| offset: 1 CreateTime: 1634805907230 keysize: -1 valuesize: 39 sequence: 1
headerKeys: [] payload: {"value": "banana", "time": 1634805907}
| offset: 2 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 2
headerKeys: [] payload: {"value": "ice", "time": 1634805907}
| offset: 3 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 3
headerKeys: [] payload: {"value": "apple", "time": 1634805907}
| offset: 4 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 4
headerKeys: [] payload: {"value": "home", "time": 1634805907}
| offset: 5 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 5
headerKeys: [] payload: {"value": "juice", "time": 1634805907}
| offset: 6 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 6
headerKeys: [] payload: {"value": "cat", "time": 1634805907}
| offset: 7 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 7
headerKeys: [] payload: {"value": "cat", "time": 1634805907}
| offset: 8 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 8
headerKeys: [] payload: {"value": "girl", "time": 1634805907}
| offset: 9 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 9
headerKeys: [] payload: {"value": "cat", "time": 1634805907}
baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1
producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true
isControl: true position: 208 CreateTime: 1634805908149 size: 78 magic: 2
compresscodec: NONE crc: 1662003889 isvalid: ec: NONE crc: 1662003889 isvalid:
true
| offset: 10 CreateTime: 1634805908149 keysize: 4 valuesize: 6 sequence: -1
headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
{code}
I think we should use
[KafkaConsumer.position()|https://github.com/apache/kafka/blob/2.6.1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1742]
to get the correct offset instead of offsets in messages. I will create a PR
for that later.
was:
The Kafka sink connectors don't commit offset to the latest log-end offset if
the messages are produced in a transaction.
>From the code of [WorkerSinkTask.java|#L467], we found that the sink connector
>gets offset from messages and commits it to Kafka after the messages are
>processed successfully. But for messages produced in the transaction, there
>are additional record [control
>batches|http://kafka.apache.org/documentation/#controlbatch] that are used to
>indicate the transaction is successful or aborted.
You can reproduce it by running `connect-file-sink` with the following
properties:
{noformat}
/opt/kafka/bin/connect-standalone.sh /connect-standalone.properties
/connect-file-sink.properties{noformat}
{code:java}
# connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# for testing
offset.flush.interval.ms=10000
consumer.isolation.level=read_committed
consumer.auto.offset.reset=none
{code}
{code:java}
# connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=test{code}
And use the attached Java producer ([^Main.scala] to produce 10 messages to the
`test` topic in a transaction.
You can see that the topic log-end offset is 11 now and the last record in the
segment file is control batches. But the consumer group offset is still in 10.
(If the record is deleted by topic retention, you will get OffsetOutOfRange
exception after restart the connector)
{code:java}
bash-5.1# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server
kafka1:9092 --group connect-local-file-sink --describe
GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET LAG CONSUMER-ID
HOST CLIENT-ID
connect-local-file-sink test 0 10 11
1
connector-consumer-local-file-sink-0-10777adb-72c2-4fd3-8773-4f5a0498903d
/172.21.0.3 connector-consumer-local-file-sink-0
bash-5.1# /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
/kafka/test-0/00000000000000000000.log --print-data-log
Dumping /kafka/test-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9
producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true
isControl: false position: 0 CreateTime: 1634805907230 size: 208 magic: 2
compresscodec: GZIP crc: 2170304005 isvalid: tru GZIP crc: 2170304005 isvalid:
true
| offset: 0 CreateTime: 1634805907210 keysize: -1 valuesize: 39 sequence: 0
headerKeys: [] payload: {"value": "banana", "time": 1634805907}
| offset: 1 CreateTime: 1634805907230 keysize: -1 valuesize: 39 sequence: 1
headerKeys: [] payload: {"value": "banana", "time": 1634805907}
| offset: 2 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 2
headerKeys: [] payload: {"value": "ice", "time": 1634805907}
| offset: 3 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 3
headerKeys: [] payload: {"value": "apple", "time": 1634805907}
| offset: 4 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 4
headerKeys: [] payload: {"value": "home", "time": 1634805907}
| offset: 5 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 5
headerKeys: [] payload: {"value": "juice", "time": 1634805907}
| offset: 6 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 6
headerKeys: [] payload: {"value": "cat", "time": 1634805907}
| offset: 7 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 7
headerKeys: [] payload: {"value": "cat", "time": 1634805907}
| offset: 8 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 8
headerKeys: [] payload: {"value": "girl", "time": 1634805907}
| offset: 9 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 9
headerKeys: [] payload: {"value": "cat", "time": 1634805907}
baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1
producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true
isControl: true position: 208 CreateTime: 1634805908149 size: 78 magic: 2
compresscodec: NONE crc: 1662003889 isvalid: ec: NONE crc: 1662003889 isvalid:
true
| offset: 10 CreateTime: 1634805908149 keysize: 4 valuesize: 6 sequence: -1
headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
{code}
I think we should use
[KafkaConsumer.position()|https://github.com/apache/kafka/blob/2.6.1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1742]
to get the correct offset instead of offsets in messages. I will create a PR
for that later.
> Kafka sink connectors do not commit offset correctly if messages are produced
> in transaction
> --------------------------------------------------------------------------------------------
>
> Key: KAFKA-13404
> URL: https://issues.apache.org/jira/browse/KAFKA-13404
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 2.6.1
> Reporter: Yu-Jhe Li
> Priority: Major
> Attachments: Main.scala
>
>
> The Kafka sink connectors don't commit offset to the latest log-end offset if
> the messages are produced in a transaction.
> From the code of
> [[WorkerSinkTask.java|https://github.com/apache/kafka/blob/2.6.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L477]|#L467],
> we found that the sink connector gets offset from messages and commits it to
> Kafka after the messages are processed successfully. But for messages
> produced in the transaction, there are additional record [control
> batches|http://kafka.apache.org/documentation/#controlbatch] that are used to
> indicate the transaction is successful or aborted.
>
> You can reproduce it by running `connect-file-sink` with the following
> properties:
> {noformat}
> /opt/kafka/bin/connect-standalone.sh /connect-standalone.properties
> /connect-file-sink.properties{noformat}
> {code:java}
> # connect-standalone.properties
> bootstrap.servers=localhost:9092
> key.converter=org.apache.kafka.connect.storage.StringConverter
> value.converter=org.apache.kafka.connect.storage.StringConverter
> key.converter.schemas.enable=true
> value.converter.schemas.enable=true
> # for testing
> offset.flush.interval.ms=10000
> consumer.isolation.level=read_committed
> consumer.auto.offset.reset=none
> {code}
> {code:java}
> # connect-file-sink.properties
> name=local-file-sink
> connector.class=FileStreamSink
> tasks.max=1
> file=/tmp/test.sink.txt
> topics=test{code}
> And use the attached Java producer ([^Main.scala] to produce 10 messages to
> the `test` topic in a transaction.
> You can see that the topic log-end offset is 11 now and the last record in
> the segment file is control batches. But the consumer group offset is still
> in 10. (If the record is deleted by topic retention, you will get
> OffsetOutOfRange exception after restart the connector)
> {code:java}
> bash-5.1# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server
> kafka1:9092 --group connect-local-file-sink --describe
> GROUP TOPIC PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG CONSUMER-ID
> HOST CLIENT-ID
>
> connect-local-file-sink test 0 10 11
> 1
> connector-consumer-local-file-sink-0-10777adb-72c2-4fd3-8773-4f5a0498903d
> /172.21.0.3 connector-consumer-local-file-sink-0
> bash-5.1# /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments
> --files /kafka/test-0/00000000000000000000.log --print-data-log
> Dumping /kafka/test-0/00000000000000000000.log
> Starting offset: 0
> baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9
> producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional:
> true isControl: false position: 0 CreateTime: 1634805907230 size: 208 magic:
> 2 compresscodec: GZIP crc: 2170304005 isvalid: tru GZIP crc: 2170304005
> isvalid: true
> | offset: 0 CreateTime: 1634805907210 keysize: -1 valuesize: 39 sequence: 0
> headerKeys: [] payload: {"value": "banana", "time": 1634805907}
> | offset: 1 CreateTime: 1634805907230 keysize: -1 valuesize: 39 sequence: 1
> headerKeys: [] payload: {"value": "banana", "time": 1634805907}
> | offset: 2 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 2
> headerKeys: [] payload: {"value": "ice", "time": 1634805907}
> | offset: 3 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 3
> headerKeys: [] payload: {"value": "apple", "time": 1634805907}
> | offset: 4 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 4
> headerKeys: [] payload: {"value": "home", "time": 1634805907}
> | offset: 5 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 5
> headerKeys: [] payload: {"value": "juice", "time": 1634805907}
> | offset: 6 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 6
> headerKeys: [] payload: {"value": "cat", "time": 1634805907}
> | offset: 7 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 7
> headerKeys: [] payload: {"value": "cat", "time": 1634805907}
> | offset: 8 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 8
> headerKeys: [] payload: {"value": "girl", "time": 1634805907}
> | offset: 9 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 9
> headerKeys: [] payload: {"value": "cat", "time": 1634805907}
> baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1
> producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional:
> true isControl: true position: 208 CreateTime: 1634805908149 size: 78 magic:
> 2 compresscodec: NONE crc: 1662003889 isvalid: ec: NONE crc: 1662003889
> isvalid: true
> | offset: 10 CreateTime: 1634805908149 keysize: 4 valuesize: 6 sequence: -1
> headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
> {code}
>
> I think we should use
> [KafkaConsumer.position()|https://github.com/apache/kafka/blob/2.6.1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1742]
> to get the correct offset instead of offsets in messages. I will create a PR
> for that later.
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)