[ 
https://issues.apache.org/jira/browse/KAFKA-15800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784341#comment-17784341
 ] 

Luke Chen commented on KAFKA-15800:
-----------------------------------

[~gharris1727], thanks for reporting this regression issue. Do you have 
estimate time when this PR will be merged? I'm planning to create a v3.5.2 CR 
build soon. Thanks.

> Malformed connect source offsets corrupt other partitions with DataException
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-15800
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15800
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 3.5.0, 3.6.0, 3.5.1
>            Reporter: Greg Harris
>            Assignee: Greg Harris
>            Priority: Blocker
>             Fix For: 3.5.2, 3.7.0, 3.6.1
>
>
> The KafkaOffsetBackingStore consumer callback was recently augmented with a 
> call to OffsetUtils.processPartitionKey: 
> [https://github.com/apache/kafka/blob/f1e58a35d7aebbe72844faf3e5019d9aa7a85e4a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L323]
> This function deserializes the offset key, which may be malformed in the 
> topic: 
> [https://github.com/apache/kafka/blob/f1e58a35d7aebbe72844faf3e5019d9aa7a85e4a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java#L92]
> When this happens, a DataException is thrown, and propagates to the 
> KafkaBasedLog try-catch surrounding the batch processing of the records: 
> [https://github.com/apache/kafka/blob/f1e58a35d7aebbe72844faf3e5019d9aa7a85e4a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L445-L454]
> For example:
> {noformat}
> ERROR Error polling: org.apache.kafka.connect.errors.DataException: 
> Converting byte[] to Kafka Connect data failed due to serialization error:  
> (org.apache.kafka.connect.util.KafkaBasedLog:453){noformat}
> This means that one DataException for a malformed record may cause the 
> remainder of the batch to be dropped, corrupting the in-memory state of the 
> KafkaOffsetBackingStore. This prevents tasks using the 
> KafkaOffsetBackingStore from seeing all of the offsets in the topics, and can 
> cause duplicate records to be emitted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to