[ https://issues.apache.org/jira/browse/KAFKA-18723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17924303#comment-17924303 ]
Jun Rao commented on KAFKA-18723: --------------------------------- [~jsancio] : Thanks for reporting this. This does seem to be an issue. When a fetch request is served in the middle of truncation/append, the follower could get different variants of incorrect data. The simple case is that the incorrect data is corrupted as reported in this jira. We could catch CorruptRecordException and handle it accordingly. A more subtle and rare case is that the truncation happens to align on the batch boundary and the incorrect data is not corrupted. This can cause incorrect data to be silently and permanently added to the log. Consider that following steps. 1. Replica A is the leader with epoch 10 and appends the following data (offset, leader epoch) to its log. HWM is at offset 101. Replica A's log (100, 10), (101, 10), (102, 10), (103, 10) 2. Replica B is a follower and fetches from A with lastEpoch=10 and fetchOffset=101 3. Replica A creates a FileRecords including offset 101 to 103. While the response is being sent, Replica C takes over as the new leader with epoch 11. Replica C only has offsets up to 101 at epoch 10 and appends two new records starting at offset 102 with epoch 11. Replica C's log (100, 10), (101, 10), (102, 11), (103, 11) 4. Replica A follows replica C. It truncates offset 102 and then fetches the new data. Replica A's log becomes the following, same as C. Replica A's log (100, 10), (101, 10), (102, 11), (103, 11) 5. Replica B finally receives the response from step 2. The response could include a mix of A's data in step 1 and in step 4, such as (101, 10), (102, 10), (103, 11). This will be appended to B's log, but (102, 10) doesn't match the data in replica C and is incorrect. 6. Replica B then follows and fetches from replica C with lastEpoch=11 and fetchOffset=104. Replica C verifies that lastEpoch/fetchOffset match the epoch in its local log and accepts the fetch request without reporting divergent epoch. Now, the incorrect (102, 10) will be permanently in replica B. As for the fix, one potential solution is for a follower to reject any batches whose leader epoch is higher than its current leader epoch. Intuitively, any batch with a higher epoch should be obtained from the new leader when the follower receives the new leader epoch. If we do this, in step 5, replica B can only append (101, 10), (102, 10) to its log. (103, 11) has a higher epoch than replica B and will be rejected. Then in step 6, replica B will fetch from replica C with lastEpoch=10 and fetchOffset=103. Replica C will detect there is log divergence since epoch 10 ends at offset 101 in its log. Once replica B receives the diverging epoch in the response, it will remove (102, 10) from its log. > KRaft must handle corrupted records in the fetch response > --------------------------------------------------------- > > Key: KAFKA-18723 > URL: https://issues.apache.org/jira/browse/KAFKA-18723 > Project: Kafka > Issue Type: Bug > Components: kraft > Reporter: José Armando García Sancio > Assignee: José Armando García Sancio > Priority: Major > Fix For: 3.9.1, 3.8.2, 3.7.3 > > > It is possible for a KRaft replica to send corrupted records to the fetching > replicas in the FETCH response. This is because there is a race between when > the FETCH response gets generated by the KRaft IO thread and when the network > thread, or linux kernel, reads the byte position in the log segment. > This race can generated corrupted records if the KRaft replica performed a > truncation after the FETCH response was created but before the network thread > read the bytes from the log segment. > I have seen the following errors: > {code:java} > [ERROR] 2025-01-07 15:04:18,273 [kafka-0-raft-io-thread] > org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - > Encountered fatal fault: Unexpected error in raft IO thread > org.apache.kafka.common.KafkaException: Append failed unexpectedly > at > kafka.raft.KafkaMetadataLog.handleAndConvertLogAppendInfo(KafkaMetadataLog.scala:117) > at > kafka.raft.KafkaMetadataLog.appendAsFollower(KafkaMetadataLog.scala:110) > at > org.apache.kafka.raft.KafkaRaftClient.appendAsFollower(KafkaRaftClient.java:1227) > at > org.apache.kafka.raft.KafkaRaftClient.handleFetchResponse(KafkaRaftClient.java:1209) > at > org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1644) > at > org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1770) > at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2355) > at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:71) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:138){code} > and > {code:java} > [ERROR] 2025-01-07 18:06:20,121 [kafka-1-raft-io-thread] > org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - > Encountered fatal fault: Unexpected error in raft IO thread" > org.apache.kafka.common.errors.CorruptRecordException: Record size 0 is less > than the minimum record overhead (14)"{code} > This race also exists with Kafka's ISR based topic partition. In that case > the replica fetcher catches all CorruptRecordException and > InvalidRecordException. > {code:java} > } catch { > case ime@(_: CorruptRecordException | _: > InvalidRecordException) => > // we log the error and continue. This ensures two > things > // 1. If there is a corrupt message in a topic > partition, it does not bring the fetcher thread > // down and cause other topic partition to also lag > // 2. If the message is corrupt due to a transient > state in the log (truncation, partial writes > // can cause this), we simply continue and should > get fixed in the subsequent fetches > error(s"Found invalid messages during fetch for > partition $topicPartition " + > s"offset ${currentFetchState.fetchOffset}", ime) > partitionsWithError += topicPartition > {code} > The KRaft implementation doesn't handle this case: > {code:java} > } else { > Records records = > FetchResponse.recordsOrFail(partitionResponse); > if (records.sizeInBytes() > 0) { > appendAsFollower(records); > } > OptionalLong highWatermark = > partitionResponse.highWatermark() < 0 ? > OptionalLong.empty() : > OptionalLong.of(partitionResponse.highWatermark()); > updateFollowerHighWatermark(state, highWatermark); > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)