[ 
https://issues.apache.org/jira/browse/KAFKA-18723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17924342#comment-17924342
 ] 

José Armando García Sancio commented on KAFKA-18723:
----------------------------------------------------

Thanks for the comment [~junrao] . Good catch on the example that results in an 
inconsistent log. I like your suggestion. It seems correct to me since the 
following replica, B in your example, can only append records up to the old 
leader's epoch (A). This is true in KRaft because replica B only handles the 
records in the FETCH responses if the local epoch matches the remote replica's 
epoch.
{code:java}
          if (epoch < quorum.epoch() || error == Errors.UNKNOWN_LEADER_EPOCH) {
              // We have a larger epoch, so the response is no longer relevant
              return Optional.of(value: true);
          } else if (epoch > quorum.epoch()
              || error == Errors.FENCED_LEADER_EPOCH
              || error == Errors.NOT_LEADER_OR_FOLLOWER) {
              // The response indicates that the request had a stale epoch, but 
we need
              // to validate the epoch from the response against our current 
state.
              maybeTransition(leaderId, epoch, leaderEndpoints, currentTimeMs);
              return Optional.of(value: true);
          } ... {code}
For this solution to work for ISR partitions the abstract replica fetcher needs 
to make sure that it's local partition leader epoch still matches the remote 
partition leader epoch. Right now the replica fetcher only does this check.
{code:java}
              if (fetchPartitionData != null && fetchPartitionData.fetchOffset 
== currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) { {code}
 

After that I think the implementation is relatively simple if we extend the 
{{UnifiedLog#appendAsFollower}} to include the local leader epoch and slice the 
Records up to the last batch that has a partition leader epoch less than the 
current leader epoch. We don't need to introduce a new AppendOrigin since the 
same solution should apply to both KRaft and ISR partitions.

> KRaft must handle corrupted records in the fetch response
> ---------------------------------------------------------
>
>                 Key: KAFKA-18723
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18723
>             Project: Kafka
>          Issue Type: Bug
>          Components: kraft
>            Reporter: José Armando García Sancio
>            Assignee: José Armando García Sancio
>            Priority: Major
>             Fix For: 3.9.1, 3.8.2, 3.7.3
>
>
> It is possible for a KRaft replica to send corrupted records to the fetching 
> replicas in the FETCH response. This is because there is a race between when 
> the FETCH response gets generated by the KRaft IO thread and when the network 
> thread, or linux kernel, reads the byte position in the log segment.
> This race can generated corrupted records if the KRaft replica performed a 
> truncation after the FETCH response was created but before the network thread 
> read the bytes from the log segment.
> I have seen the following errors:
> {code:java}
>  [ERROR] 2025-01-07 15:04:18,273 [kafka-0-raft-io-thread] 
> org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
> Encountered fatal fault: Unexpected error in raft IO thread
> org.apache.kafka.common.KafkaException: Append failed unexpectedly
>       at 
> kafka.raft.KafkaMetadataLog.handleAndConvertLogAppendInfo(KafkaMetadataLog.scala:117)
>       at 
> kafka.raft.KafkaMetadataLog.appendAsFollower(KafkaMetadataLog.scala:110)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.appendAsFollower(KafkaRaftClient.java:1227)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.handleFetchResponse(KafkaRaftClient.java:1209)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1644)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1770)
>       at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2355)
>       at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:71)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:138){code}
> and
> {code:java}
>  [ERROR] 2025-01-07 18:06:20,121 [kafka-1-raft-io-thread] 
> org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
> Encountered fatal fault: Unexpected error in raft IO thread"
> org.apache.kafka.common.errors.CorruptRecordException: Record size 0 is less 
> than the minimum record overhead (14)"{code}
> This race also exists with Kafka's ISR based topic partition. In that case 
> the replica fetcher catches all CorruptRecordException and 
> InvalidRecordException.
> {code:java}
>                     } catch {
>                       case ime@(_: CorruptRecordException | _: 
> InvalidRecordException) =>
>                         // we log the error and continue. This ensures two 
> things
>                         // 1. If there is a corrupt message in a topic 
> partition, it does not bring the fetcher thread
>                         //    down and cause other topic partition to also lag
>                         // 2. If the message is corrupt due to a transient 
> state in the log (truncation, partial writes
>                         //    can cause this), we simply continue and should 
> get fixed in the subsequent fetches
>                         error(s"Found invalid messages during fetch for 
> partition $topicPartition " +
>                           s"offset ${currentFetchState.fetchOffset}", ime)
>                         partitionsWithError += topicPartition
>  {code}
> The KRaft implementation doesn't handle this case:
> {code:java}
>               } else {
>                   Records records = 
> FetchResponse.recordsOrFail(partitionResponse);
>                   if (records.sizeInBytes() > 0) {
>                       appendAsFollower(records);
>                   }
>                   OptionalLong highWatermark = 
> partitionResponse.highWatermark() < 0 ?
>                       OptionalLong.empty() :
>                       OptionalLong.of(partitionResponse.highWatermark());
>                   updateFollowerHighWatermark(state, highWatermark);
>               }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to