jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r564212153



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -963,49 +979,111 @@ private FetchResponseData tryCompleteFetchRequest(
         FetchRequestData.FetchPartition request,
         long currentTimeMs
     ) {
-        Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
-        if (errorOpt.isPresent()) {
-            return buildEmptyFetchResponse(errorOpt.get(), Optional.empty());
-        }
+        try {
+            Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
+            if (errorOpt.isPresent()) {
+                return buildEmptyFetchResponse(errorOpt.get(), 
Optional.empty());
+            }
 
-        long fetchOffset = request.fetchOffset();
-        int lastFetchedEpoch = request.lastFetchedEpoch();
-        LeaderState state = quorum.leaderStateOrThrow();
-        Optional<OffsetAndEpoch> divergingEpochOpt = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
-        if (divergingEpochOpt.isPresent()) {
-            Optional<FetchResponseData.EpochEndOffset> divergingEpoch =
-                divergingEpochOpt.map(offsetAndEpoch -> new 
FetchResponseData.EpochEndOffset()
-                    .setEpoch(offsetAndEpoch.epoch)
-                    .setEndOffset(offsetAndEpoch.offset));
-            return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, 
divergingEpoch, state.highWatermark());
-        } else {
-            LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
+            long fetchOffset = request.fetchOffset();
+            int lastFetchedEpoch = request.lastFetchedEpoch();
+            LeaderState state = quorum.leaderStateOrThrow();
+            ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
 
-            if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
-                onUpdateLeaderHighWatermark(state, currentTimeMs);
+            final Records records;
+            if (validatedOffsetAndEpoch.type() == 
ValidatedFetchOffsetAndEpoch.Type.VALID) {
+                LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
+
+                if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
+                    onUpdateLeaderHighWatermark(state, currentTimeMs);
+                }
+
+                records = info.records;
+            } else {
+                records = MemoryRecords.EMPTY;
             }
 
-            return buildFetchResponse(Errors.NONE, info.records, 
Optional.empty(), state.highWatermark());
+            return buildFetchResponse(Errors.NONE, records, 
validatedOffsetAndEpoch, state.highWatermark());
+        } catch (Exception e) {
+            logger.error("Caught unexpected error in fetch completion of 
request {}", request, e);
+            return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, 
Optional.empty());
         }
     }
 
     /**
      * Check whether a fetch offset and epoch is valid. Return the diverging 
epoch, which
      * is the largest epoch such that subsequent records are known to diverge.
      */
-    private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
-        if (fetchOffset == 0 && lastFetchedEpoch == 0) {
-            return Optional.empty();
+    private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
+        if (log.startOffset() == 0 && fetchOffset == 0) {
+            if (lastFetchedEpoch != 0) {
+                logger.warn(
+                    "Replica sent a zero fetch offset ({}) but the last 
fetched epoch ({}) was not zero",
+                    fetchOffset,
+                    lastFetchedEpoch
+                );
+            }
+            return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
         }
 
-        OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch)
-            .orElse(new OffsetAndEpoch(-1L, -1));
-        if (endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset) {
-            return Optional.of(endOffsetAndEpoch);
+
+        Optional<OffsetAndEpoch> endOffsetAndEpochOpt = log
+            .endOffsetForEpoch(lastFetchedEpoch)
+            .flatMap(endOffsetAndEpoch -> {
+                if (endOffsetAndEpoch.epoch == lastFetchedEpoch && 
endOffsetAndEpoch.offset == log.startOffset()) {

Review comment:
       Note that the latest implementation changed a bit but this comment 
should still apply.
   
   Yeah. I think we can't trust the `endOffsetAndEpoch` returned by 
`kafka.log.Log` when the epoch is smaller than any known epoch in the log. We 
know this is the case if `endOffsetForEpoch` returns `new 
OffsetAndEpoch(log.startOffset(), lastFetchedEpoch)`. If `endOffsetForEpoch` 
returns this value we overwrite it to the `oldestSnapshotId`. In other words if 
the leader has the following state:
   
   oldestSnapshotId: endOffset = 10, epoch = 2
   logStartOffset: 10
   leaderEpochCache: (epoch = 4, startOffset = 10), ...
   
   If the follower sends the Fetch request for offset 11 and last fetched 
offset 3, then the leader should return a diverging epoch of 
OffsetAndEpoch(offset=10, epoch=2). If we don't do this overwrite the leader 
would return OffsetAndEpoch(offset=10, epoch=3).

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -963,49 +979,111 @@ private FetchResponseData tryCompleteFetchRequest(
         FetchRequestData.FetchPartition request,
         long currentTimeMs
     ) {
-        Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
-        if (errorOpt.isPresent()) {
-            return buildEmptyFetchResponse(errorOpt.get(), Optional.empty());
-        }
+        try {
+            Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
+            if (errorOpt.isPresent()) {
+                return buildEmptyFetchResponse(errorOpt.get(), 
Optional.empty());
+            }
 
-        long fetchOffset = request.fetchOffset();
-        int lastFetchedEpoch = request.lastFetchedEpoch();
-        LeaderState state = quorum.leaderStateOrThrow();
-        Optional<OffsetAndEpoch> divergingEpochOpt = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
-        if (divergingEpochOpt.isPresent()) {
-            Optional<FetchResponseData.EpochEndOffset> divergingEpoch =
-                divergingEpochOpt.map(offsetAndEpoch -> new 
FetchResponseData.EpochEndOffset()
-                    .setEpoch(offsetAndEpoch.epoch)
-                    .setEndOffset(offsetAndEpoch.offset));
-            return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, 
divergingEpoch, state.highWatermark());
-        } else {
-            LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
+            long fetchOffset = request.fetchOffset();
+            int lastFetchedEpoch = request.lastFetchedEpoch();
+            LeaderState state = quorum.leaderStateOrThrow();
+            ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
 
-            if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
-                onUpdateLeaderHighWatermark(state, currentTimeMs);
+            final Records records;
+            if (validatedOffsetAndEpoch.type() == 
ValidatedFetchOffsetAndEpoch.Type.VALID) {
+                LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
+
+                if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
+                    onUpdateLeaderHighWatermark(state, currentTimeMs);
+                }
+
+                records = info.records;
+            } else {
+                records = MemoryRecords.EMPTY;
             }
 
-            return buildFetchResponse(Errors.NONE, info.records, 
Optional.empty(), state.highWatermark());
+            return buildFetchResponse(Errors.NONE, records, 
validatedOffsetAndEpoch, state.highWatermark());
+        } catch (Exception e) {
+            logger.error("Caught unexpected error in fetch completion of 
request {}", request, e);
+            return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, 
Optional.empty());
         }
     }
 
     /**
      * Check whether a fetch offset and epoch is valid. Return the diverging 
epoch, which
      * is the largest epoch such that subsequent records are known to diverge.
      */
-    private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
-        if (fetchOffset == 0 && lastFetchedEpoch == 0) {
-            return Optional.empty();
+    private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
+        if (log.startOffset() == 0 && fetchOffset == 0) {
+            if (lastFetchedEpoch != 0) {
+                logger.warn(
+                    "Replica sent a zero fetch offset ({}) but the last 
fetched epoch ({}) was not zero",
+                    fetchOffset,
+                    lastFetchedEpoch
+                );
+            }
+            return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
         }
 
-        OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch)
-            .orElse(new OffsetAndEpoch(-1L, -1));
-        if (endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset) {
-            return Optional.of(endOffsetAndEpoch);
+
+        Optional<OffsetAndEpoch> endOffsetAndEpochOpt = log
+            .endOffsetForEpoch(lastFetchedEpoch)
+            .flatMap(endOffsetAndEpoch -> {
+                if (endOffsetAndEpoch.epoch == lastFetchedEpoch && 
endOffsetAndEpoch.offset == log.startOffset()) {

Review comment:
       Note that the latest implementation changed a bit but this comment 
should still apply.
   
   Yeah. I think we can't trust the `endOffsetAndEpoch` returned by 
`kafka.log.Log` when the epoch is smaller than any known epoch in the log. We 
know this is the case if `endOffsetForEpoch` returns `new 
OffsetAndEpoch(log.startOffset(), lastFetchedEpoch)`. If `endOffsetForEpoch` 
returns this value we overwrite it to the `oldestSnapshotId`. In other words if 
the leader has the following state:
   
   oldestSnapshotId: endOffset = 10, epoch = 2
   logStartOffset: 10
   leaderEpochCache: (epoch = 4, startOffset = 10), ...
   
   If the follower sends the Fetch request for offset 11 and last fetched 
offset 3, then the leader should return a diverging epoch of 
`OffsetAndEpoch(offset=10, epoch=2)`. If we don't do this overwrite the leader 
would return `OffsetAndEpoch(offset=10, epoch=3)`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -963,49 +979,111 @@ private FetchResponseData tryCompleteFetchRequest(
         FetchRequestData.FetchPartition request,
         long currentTimeMs
     ) {
-        Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
-        if (errorOpt.isPresent()) {
-            return buildEmptyFetchResponse(errorOpt.get(), Optional.empty());
-        }
+        try {
+            Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
+            if (errorOpt.isPresent()) {
+                return buildEmptyFetchResponse(errorOpt.get(), 
Optional.empty());
+            }
 
-        long fetchOffset = request.fetchOffset();
-        int lastFetchedEpoch = request.lastFetchedEpoch();
-        LeaderState state = quorum.leaderStateOrThrow();
-        Optional<OffsetAndEpoch> divergingEpochOpt = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
-        if (divergingEpochOpt.isPresent()) {
-            Optional<FetchResponseData.EpochEndOffset> divergingEpoch =
-                divergingEpochOpt.map(offsetAndEpoch -> new 
FetchResponseData.EpochEndOffset()
-                    .setEpoch(offsetAndEpoch.epoch)
-                    .setEndOffset(offsetAndEpoch.offset));
-            return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, 
divergingEpoch, state.highWatermark());
-        } else {
-            LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
+            long fetchOffset = request.fetchOffset();
+            int lastFetchedEpoch = request.lastFetchedEpoch();
+            LeaderState state = quorum.leaderStateOrThrow();
+            ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
 
-            if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
-                onUpdateLeaderHighWatermark(state, currentTimeMs);
+            final Records records;
+            if (validatedOffsetAndEpoch.type() == 
ValidatedFetchOffsetAndEpoch.Type.VALID) {
+                LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
+
+                if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
+                    onUpdateLeaderHighWatermark(state, currentTimeMs);
+                }
+
+                records = info.records;
+            } else {
+                records = MemoryRecords.EMPTY;
             }
 
-            return buildFetchResponse(Errors.NONE, info.records, 
Optional.empty(), state.highWatermark());
+            return buildFetchResponse(Errors.NONE, records, 
validatedOffsetAndEpoch, state.highWatermark());
+        } catch (Exception e) {
+            logger.error("Caught unexpected error in fetch completion of 
request {}", request, e);
+            return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, 
Optional.empty());
         }
     }
 
     /**
      * Check whether a fetch offset and epoch is valid. Return the diverging 
epoch, which
      * is the largest epoch such that subsequent records are known to diverge.
      */
-    private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
-        if (fetchOffset == 0 && lastFetchedEpoch == 0) {
-            return Optional.empty();
+    private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
+        if (log.startOffset() == 0 && fetchOffset == 0) {
+            if (lastFetchedEpoch != 0) {
+                logger.warn(
+                    "Replica sent a zero fetch offset ({}) but the last 
fetched epoch ({}) was not zero",
+                    fetchOffset,
+                    lastFetchedEpoch
+                );
+            }
+            return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
         }
 
-        OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch)
-            .orElse(new OffsetAndEpoch(-1L, -1));
-        if (endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset) {
-            return Optional.of(endOffsetAndEpoch);
+
+        Optional<OffsetAndEpoch> endOffsetAndEpochOpt = log
+            .endOffsetForEpoch(lastFetchedEpoch)
+            .flatMap(endOffsetAndEpoch -> {
+                if (endOffsetAndEpoch.epoch == lastFetchedEpoch && 
endOffsetAndEpoch.offset == log.startOffset()) {
+                    // This means that either:
+                    // 1. The lastFetchedEpoch is smaller than any known epoch
+                    // 2. The current leader epoch is lastFetchedEpoch and the 
log is empty.
+                    // Assume that there is not diverging information
+                    return Optional.empty();
+                } else {
+                    return Optional.of(endOffsetAndEpoch);
+                }
+            });
+        if (endOffsetAndEpochOpt.isPresent()) {
+            OffsetAndEpoch endOffsetAndEpoch = endOffsetAndEpochOpt.get();
+            if (endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset) {
+                return 
ValidatedFetchOffsetAndEpoch.diverging(endOffsetAndEpoch);
+            } else {
+                return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
+            }
+        } else if (log.startOffset() > 0) {
+            OffsetAndEpoch oldestSnapshotId = 
log.oldestSnapshotId().orElseThrow(() -> {
+                return new IllegalStateException(
+                    String.format(
+                        "The log start offset (%s) was greater than zero but 
start snapshot was not found",
+                        log.startOffset()
+                    )
+                );
+            });
+
+            if (fetchOffset == log.startOffset() && lastFetchedEpoch == 
oldestSnapshotId.epoch) {

Review comment:
       Sorry @hachikuji but I changed this code again. I was having a hard time 
reading my implementation. Yes, I think you are right,we should be able to move 
this code into `KafkaMetadataLog`. Let me do that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to