jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r555992377



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1326,6 +1408,17 @@ private boolean handleFetchSnapshotResponse(
             // Finished fetching the snapshot.
             snapshot.freeze();
             state.setFetchingSnapshot(Optional.empty());
+
+            if (log.truncateFullyToLatestSnapshot()) {
+                updateFollowerHighWatermark(state, 
OptionalLong.of(log.highWatermark()));
+            } else {
+                logger.error(
+                    "Full log trunctation expected but didn't happend. 
Snapshot of {}, log end offset {}, last fetched {}",

Review comment:
       Updated the code to throw an exception instead of logging an error.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -970,40 +996,98 @@ private FetchResponseData tryCompleteFetchRequest(
         long fetchOffset = request.fetchOffset();
         int lastFetchedEpoch = request.lastFetchedEpoch();
         LeaderState state = quorum.leaderStateOrThrow();
-        Optional<OffsetAndEpoch> divergingEpochOpt = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
-        if (divergingEpochOpt.isPresent()) {
-            Optional<FetchResponseData.EpochEndOffset> divergingEpoch =
-                divergingEpochOpt.map(offsetAndEpoch -> new 
FetchResponseData.EpochEndOffset()
-                    .setEpoch(offsetAndEpoch.epoch)
-                    .setEndOffset(offsetAndEpoch.offset));
-            return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, 
divergingEpoch, state.highWatermark());
-        } else {
+        ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
+
+        final Records records;
+        if (validatedOffsetAndEpoch.type() == 
ValidatedFetchOffsetAndEpoch.Type.VALID) {
             LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
 
             if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
                 onUpdateLeaderHighWatermark(state, currentTimeMs);
             }
 
-            return buildFetchResponse(Errors.NONE, info.records, 
Optional.empty(), state.highWatermark());
+            records = info.records;
+        } else {
+            records = MemoryRecords.EMPTY;
         }
+
+        return buildFetchResponse(Errors.NONE, records, 
validatedOffsetAndEpoch, state.highWatermark());
     }
 
     /**
      * Check whether a fetch offset and epoch is valid. Return the diverging 
epoch, which
      * is the largest epoch such that subsequent records are known to diverge.
      */
-    private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
-        if (fetchOffset == 0 && lastFetchedEpoch == 0) {
-            return Optional.empty();
+    private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
+        if (log.startOffset() == 0 && fetchOffset == 0) {
+            if (lastFetchedEpoch != 0) {
+                logger.warn(
+                    "Replica sent a zero fetch offset ({}) but the last 
fetched epoch ({}) was not zero",
+                    fetchOffset,
+                    lastFetchedEpoch
+                );
+            }
+            return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
+        }
+
+        if (fetchOffset < log.startOffset() || fetchOffset == 
log.startOffset()) {
+            // Snapshot must be present if start offset is non zero.
+            OffsetAndEpoch latestSnapshotId = 
log.latestSnapshotId().orElseThrow(() -> {
+                return new IllegalStateException(
+                    String.format(
+                        "The log start offset (%s) was greater than zero but 
no snapshot was found",
+                        log.startOffset()
+                    )
+                );
+            });
+
+            if (fetchOffset < log.startOffset() || lastFetchedEpoch != 
latestSnapshotId.epoch) {
+                return ValidatedFetchOffsetAndEpoch.snapshot(latestSnapshotId);
+            }
         }
 
         OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch)
             .orElse(new OffsetAndEpoch(-1L, -1));
         if (endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset) {
-            return Optional.of(endOffsetAndEpoch);
-        } else {
-            return Optional.empty();
+            // TODO: Investiage this. Can the diverging offset be less than 
log start offset? If so, then we might as well
+            // avoid a round trip and return the snapshot id instead.
+            return ValidatedFetchOffsetAndEpoch.diverging(endOffsetAndEpoch);
+        }
+
+        return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
+    }
+
+    private final static class ValidatedFetchOffsetAndEpoch {

Review comment:
       Done. Moved it to `internals` package.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to