[ 
https://issues.apache.org/jira/browse/KAFKA-18723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17926536#comment-17926536
 ] 

José Armando García Sancio commented on KAFKA-18723:
----------------------------------------------------

[~junrao] right but that check is done on the leader. In essence, what we are 
trying to solve is the case when the partition leader epoch on the leader 
changed after the FETCH request was handled but before the log segment was 
read. Let's consider this ISR scenario but something similar can happen in 
KRaft:
 # Partition has replicas A, B, C and they all agreed that the leader epoch is 
10 and the leader is A.
 # Replica B sends a FETCH request with current leader epoch of 10 and fetch 
offset of 101. Similar to your example.
 # Replica A, the leader, handles the FETCH request, all of its validations 
pass and sends the FETCH response to the network server.
 # Replica C gets elected leader by the controller and updates all of the 
replicas (A, B, C). So now the replicas have a leader epoch of 11 and leader of 
C.
 # Replica A truncates and appends its log in the same way as your example: 
(100, 10), (101, 10), (102, 11), (103, 11)
 # The network layer sends (101, 10), (102, 11), (103, 11) from A to B.
 # Replica B handles the FETCH response but appends up to and including (103, 
11) because it's epoch is 11.

In short, the replicating replica cannot use its own leader epoch to determine 
up to which batch to append but instead must use the leader epoch when the 
FETCH request was sent and handled. What do you think [~junrao] ?

> KRaft must handle corrupted records in the fetch response
> ---------------------------------------------------------
>
>                 Key: KAFKA-18723
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18723
>             Project: Kafka
>          Issue Type: Bug
>          Components: kraft
>            Reporter: José Armando García Sancio
>            Assignee: José Armando García Sancio
>            Priority: Major
>             Fix For: 3.9.1, 3.8.2, 3.7.3
>
>
> It is possible for a KRaft replica to send corrupted records to the fetching 
> replicas in the FETCH response. This is because there is a race between when 
> the FETCH response gets generated by the KRaft IO thread and when the network 
> thread, or linux kernel, reads the byte position in the log segment.
> This race can generated corrupted records if the KRaft replica performed a 
> truncation after the FETCH response was created but before the network thread 
> read the bytes from the log segment.
> I have seen the following errors:
> {code:java}
>  [ERROR] 2025-01-07 15:04:18,273 [kafka-0-raft-io-thread] 
> org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
> Encountered fatal fault: Unexpected error in raft IO thread
> org.apache.kafka.common.KafkaException: Append failed unexpectedly
>       at 
> kafka.raft.KafkaMetadataLog.handleAndConvertLogAppendInfo(KafkaMetadataLog.scala:117)
>       at 
> kafka.raft.KafkaMetadataLog.appendAsFollower(KafkaMetadataLog.scala:110)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.appendAsFollower(KafkaRaftClient.java:1227)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.handleFetchResponse(KafkaRaftClient.java:1209)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1644)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1770)
>       at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2355)
>       at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:71)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:138){code}
> and
> {code:java}
>  [ERROR] 2025-01-07 18:06:20,121 [kafka-1-raft-io-thread] 
> org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
> Encountered fatal fault: Unexpected error in raft IO thread"
> org.apache.kafka.common.errors.CorruptRecordException: Record size 0 is less 
> than the minimum record overhead (14)"{code}
> This race also exists with Kafka's ISR based topic partition. In that case 
> the replica fetcher catches all CorruptRecordException and 
> InvalidRecordException.
> {code:java}
>                     } catch {
>                       case ime@(_: CorruptRecordException | _: 
> InvalidRecordException) =>
>                         // we log the error and continue. This ensures two 
> things
>                         // 1. If there is a corrupt message in a topic 
> partition, it does not bring the fetcher thread
>                         //    down and cause other topic partition to also lag
>                         // 2. If the message is corrupt due to a transient 
> state in the log (truncation, partial writes
>                         //    can cause this), we simply continue and should 
> get fixed in the subsequent fetches
>                         error(s"Found invalid messages during fetch for 
> partition $topicPartition " +
>                           s"offset ${currentFetchState.fetchOffset}", ime)
>                         partitionsWithError += topicPartition
>  {code}
> The KRaft implementation doesn't handle this case:
> {code:java}
>               } else {
>                   Records records = 
> FetchResponse.recordsOrFail(partitionResponse);
>                   if (records.sizeInBytes() > 0) {
>                       appendAsFollower(records);
>                   }
>                   OptionalLong highWatermark = 
> partitionResponse.highWatermark() < 0 ?
>                       OptionalLong.empty() :
>                       OptionalLong.of(partitionResponse.highWatermark());
>                   updateFollowerHighWatermark(state, highWatermark);
>               }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to