hachikuji commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r565003308



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -131,6 +194,19 @@ class KafkaMetadataLog(
     }
   }
 
+  override def highWatermark: LogOffsetMetadata = {
+    val LogOffsetSnapshot(_, _, hwm, _) = log.fetchOffsetSnapshot
+    val segmentPosition: Optional[OffsetMetadata] = if (hwm.segmentBaseOffset 
!= Log.UnknownOffset &&

Review comment:
       Maybe use `hwm.messageOffsetOnly`?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -113,6 +160,22 @@ class KafkaMetadataLog(
     log.truncateTo(offset)
   }
 
+  override def maybeTruncateFullyToLatestSnapshot(): Boolean = {

Review comment:
       Having second thoughts about the name here (I know I suggested the 
`maybe`). Since there is a returned value, perhaps it is clear enough already 
and we can try a more concise name. How about just `truncateToLatestSnapshot`? 

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -983,48 +999,88 @@ private FetchResponseData tryCompleteFetchRequest(
         FetchRequestData.FetchPartition request,
         long currentTimeMs
     ) {
-        Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
-        if (errorOpt.isPresent()) {
-            return buildEmptyFetchResponse(errorOpt.get(), Optional.empty());
-        }
+        try {
+            Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
+            if (errorOpt.isPresent()) {
+                return buildEmptyFetchResponse(errorOpt.get(), 
Optional.empty());
+            }
 
-        long fetchOffset = request.fetchOffset();
-        int lastFetchedEpoch = request.lastFetchedEpoch();
-        LeaderState state = quorum.leaderStateOrThrow();
-        Optional<OffsetAndEpoch> divergingEpochOpt = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
-        if (divergingEpochOpt.isPresent()) {
-            Optional<FetchResponseData.EpochEndOffset> divergingEpoch =
-                divergingEpochOpt.map(offsetAndEpoch -> new 
FetchResponseData.EpochEndOffset()
-                    .setEpoch(offsetAndEpoch.epoch)
-                    .setEndOffset(offsetAndEpoch.offset));
-            return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, 
divergingEpoch, state.highWatermark());
-        } else {
-            LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
+            long fetchOffset = request.fetchOffset();
+            int lastFetchedEpoch = request.lastFetchedEpoch();
+            LeaderState state = quorum.leaderStateOrThrow();
+            ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
+
+            final Records records;
+            if (validatedOffsetAndEpoch.type() == 
ValidatedFetchOffsetAndEpoch.Type.VALID) {
+                LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
 
-            if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
-                onUpdateLeaderHighWatermark(state, currentTimeMs);
+                if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
+                    onUpdateLeaderHighWatermark(state, currentTimeMs);
+                }
+
+                records = info.records;
+            } else {
+                records = MemoryRecords.EMPTY;
             }
 
-            return buildFetchResponse(Errors.NONE, info.records, 
Optional.empty(), state.highWatermark());
+            return buildFetchResponse(Errors.NONE, records, 
validatedOffsetAndEpoch, state.highWatermark());
+        } catch (Exception e) {
+            logger.error("Caught unexpected error in fetch completion of 
request {}", request, e);
+            return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, 
Optional.empty());
         }
     }
 
     /**
      * Check whether a fetch offset and epoch is valid. Return the diverging 
epoch, which
      * is the largest epoch such that subsequent records are known to diverge.
      */
-    private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
-        if (fetchOffset == 0 && lastFetchedEpoch == 0) {
-            return Optional.empty();
+    private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
+        if (log.startOffset() == 0 && fetchOffset == 0) {
+            if (lastFetchedEpoch != 0) {
+                logger.warn(
+                    "Replica sent a zero fetch offset ({}) but the last 
fetched epoch ({}) was not zero",
+                    fetchOffset,
+                    lastFetchedEpoch
+                );
+            }
+            return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
         }
 
-        OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch)
-            .orElse(new OffsetAndEpoch(-1L, -1));
-        if (endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset) {
-            return Optional.of(endOffsetAndEpoch);
+
+        OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch).orElseThrow(() -> {
+            return new IllegalStateException(
+                String.format(
+                    "Expected to find an end offset for epoch %s since it must 
be less than the current epoch %s",
+                    lastFetchedEpoch,
+                    quorum.epoch()
+                )
+            );
+        });
+
+        if (log.oldestSnapshotId().isPresent() &&
+            ((fetchOffset < log.startOffset()) ||
+             (fetchOffset == log.startOffset() && lastFetchedEpoch != 
log.oldestSnapshotId().get().epoch) ||
+             (lastFetchedEpoch < log.oldestSnapshotId().get().epoch))) {

Review comment:
       Related to my other comment, but I think this check can go away if we 
change `endOffsetForEpoch` to only return the end offset and epoch when they 
can be definitively determined.
   
   The other condition where we need to return a snapshot is when the returned 
epoch `endOffsetAndEpoch.epoch` equals the oldest snapshot epoch and the fetch 
offset is less than the oldest snapshot offset, which I think is equivalent to 
what is here. So I guess I'm thinking we can write this like this: 
   ```
   
   OffsetAndEpoch endOffsetAndEpoch = log.endOffsetForEpoch(lastFetchedEpoch)
   if (!endOffsetAndEpoch.isPresent) {
     // return latest snapshot
   }
   
   OffsetAndEpoch oldestSnapshotId = log.oldestSnapshotId().getOrElse(new 
OffsetAndEpoch(0, 0));
   if (endOffsetAndEpoch.epoch == oldestSnapshotId.epoch && fetchOffset < 
oldestSnapshotId.offset) {
     // return latest snapshot
   else if (endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset) {
     // return diverging epoch
   } else {
     // valid fetch
   }
   ```
   

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +223,102 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+    // Do not let the state machine create snapshots older than the latest 
snapshot
+    latestSnapshotId().ifPresent { latest =>
+      if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+        // Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+        throw new IllegalArgumentException(
+          s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+        )
+      }
+    }
+
+    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
     try {
-      Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      if (snapshotIds.contains(snapshotId)) {
+        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      } else {
+        Optional.empty()
+      }
+    } catch {
+      case _: NoSuchFileException =>
+        Optional.empty()
+    }
+  }
+
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.last)
     } catch {
-      case e: NoSuchFileException => Optional.empty()
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
+  }
+
+  override def oldestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    oldestSnapshotId
+  }
+
+  override def onSnapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = {
+    snapshotIds.add(snapshotId)
+  }
+
+  override def deleteToNewOldestSnapshotId(logStartSnapshotId: 
raft.OffsetAndEpoch): Boolean = {

Review comment:
       nit: the name is a tad awkward. How about `deleteUpToSnapshot` or 
`deleteBeforeSnapshot`?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -983,48 +999,88 @@ private FetchResponseData tryCompleteFetchRequest(
         FetchRequestData.FetchPartition request,
         long currentTimeMs
     ) {
-        Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
-        if (errorOpt.isPresent()) {
-            return buildEmptyFetchResponse(errorOpt.get(), Optional.empty());
-        }
+        try {
+            Optional<Errors> errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
+            if (errorOpt.isPresent()) {
+                return buildEmptyFetchResponse(errorOpt.get(), 
Optional.empty());
+            }
 
-        long fetchOffset = request.fetchOffset();
-        int lastFetchedEpoch = request.lastFetchedEpoch();
-        LeaderState state = quorum.leaderStateOrThrow();
-        Optional<OffsetAndEpoch> divergingEpochOpt = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
-        if (divergingEpochOpt.isPresent()) {
-            Optional<FetchResponseData.EpochEndOffset> divergingEpoch =
-                divergingEpochOpt.map(offsetAndEpoch -> new 
FetchResponseData.EpochEndOffset()
-                    .setEpoch(offsetAndEpoch.epoch)
-                    .setEndOffset(offsetAndEpoch.offset));
-            return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, 
divergingEpoch, state.highWatermark());
-        } else {
-            LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
+            long fetchOffset = request.fetchOffset();
+            int lastFetchedEpoch = request.lastFetchedEpoch();
+            LeaderState state = quorum.leaderStateOrThrow();
+            ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
+
+            final Records records;
+            if (validatedOffsetAndEpoch.type() == 
ValidatedFetchOffsetAndEpoch.Type.VALID) {
+                LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
 
-            if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
-                onUpdateLeaderHighWatermark(state, currentTimeMs);
+                if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
+                    onUpdateLeaderHighWatermark(state, currentTimeMs);
+                }
+
+                records = info.records;
+            } else {
+                records = MemoryRecords.EMPTY;
             }
 
-            return buildFetchResponse(Errors.NONE, info.records, 
Optional.empty(), state.highWatermark());
+            return buildFetchResponse(Errors.NONE, records, 
validatedOffsetAndEpoch, state.highWatermark());
+        } catch (Exception e) {
+            logger.error("Caught unexpected error in fetch completion of 
request {}", request, e);
+            return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, 
Optional.empty());
         }
     }
 
     /**
      * Check whether a fetch offset and epoch is valid. Return the diverging 
epoch, which
      * is the largest epoch such that subsequent records are known to diverge.
      */
-    private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
-        if (fetchOffset == 0 && lastFetchedEpoch == 0) {
-            return Optional.empty();
+    private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
+        if (log.startOffset() == 0 && fetchOffset == 0) {
+            if (lastFetchedEpoch != 0) {
+                logger.warn(
+                    "Replica sent a zero fetch offset ({}) but the last 
fetched epoch ({}) was not zero",
+                    fetchOffset,
+                    lastFetchedEpoch
+                );
+            }
+            return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));

Review comment:
       nit: should we use 0 for `fetchOffset` and `lastFetchedEpoch` here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to