jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r623370427



##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##########
@@ -426,48 +422,55 @@ public RawSnapshotWriter createSnapshot(OffsetAndEpoch 
snapshotId) {
     public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {}
 
     @Override
-    public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) {
-        if (logStartOffset() > logStartSnapshotId.offset ||
-            highWatermark.offset < logStartSnapshotId.offset) {
-
+    public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {
+        if (logStartOffset() > snapshotId.offset) {
+            throw new OffsetOutOfRangeException(
+                String.format(
+                    "New log start (%s) is less than the curent log start 
offset (%s)",
+                    snapshotId,
+                    logStartOffset()
+                )
+            );
+        }
+        if (highWatermark.offset < snapshotId.offset) {
             throw new OffsetOutOfRangeException(
                 String.format(
-                    "New log start (%s) is less than start offset (%s) or is 
greater than the high watermark (%s)",
-                    logStartSnapshotId,
-                    logStartOffset(),
+                    "New log start (%s) is greater than the high watermark 
(%s)",
+                    snapshotId,
                     highWatermark.offset
                 )
             );
         }
 
         boolean updated = false;
-        Optional<OffsetAndEpoch> snapshotIdOpt = latestSnapshotId();
-        if (snapshotIdOpt.isPresent()) {
-            OffsetAndEpoch snapshotId = snapshotIdOpt.get();
-            if (startOffset() < logStartSnapshotId.offset &&
-                highWatermark.offset >= logStartSnapshotId.offset &&
-                snapshotId.offset >= logStartSnapshotId.offset) {
+        if (snapshots.containsKey(snapshotId)) {
+            snapshots.headMap(snapshotId, false).clear();
 
-                snapshots.headMap(logStartSnapshotId, false).clear();
+            // Update the high watermark if it is less than the new log start 
offset
+            if (snapshotId.offset > highWatermark.offset) {

Review comment:
       You are correct. This not needed.

##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##########
@@ -184,6 +187,22 @@ Builder appendToLog(int epoch, List<String> records) {
             return this;
         }
 
+        Builder withSnapshot(OffsetAndEpoch snapshotId) throws IOException {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to