jsancio commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r637470950



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2268,6 +2269,25 @@ private Long append(int epoch, List<T> records, boolean 
isAtomic) {
         );
     }
 
+    private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+        Optional<LogOffsetMetadata> highWatermarkOpt = 
quorum().highWatermark();
+        if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset < 
snapshotId.offset) {
+            throw new KafkaException("Trying to creating snapshot with invalid 
snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: 
" +
+                    highWatermarkOpt + ". This may necessarily mean a bug in 
the caller, since the there should be a minimum " +
+                    "size of records between the latest snapshot and the 
high-watermark when creating snapshot");

Review comment:
       I see. I would remove that last sentence because I don\t think this 
check enforces that and I don't think that it should.

##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##########
@@ -108,6 +108,17 @@
 
     private final List<RaftResponse.Outbound> sentResponses = new 
ArrayList<>();
 
+    public static void advanceHighWatermark(RaftClientTestContext context,

Review comment:
       How about making it a object/instance method by removing the static 
keyword so that the user can use it as follow:
   ```
   context.advanceLeaderHighWatermarkToEndOffset()
   ```
   Note that I renamed the method to include leader and removed all of the 
parameters. I could be wrong but `RaftClientTestContext` should have all of the 
information it needs to implement this. Also note, that if you want to 
generalize this then majority of the nodes need to send a `FETCH` request to 
the leader for the leader to advance the high-watermark.

##########
File path: 
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##########
@@ -135,4 +141,18 @@ public static void assertSnapshot(List<List<String>> 
batches, SnapshotReader<Str
 
         assertEquals(expected, actual);
     }
+
+    private RaftClientTestContext 
initContextAsLeaderAndAdvanceHighWatermark(OffsetAndEpoch snapshotId) throws 
Exception {

Review comment:
       Okay but note that if the quorum is a single node quorum then the 
high-watermark should advance simply by `context.client.poll()`. You can 
simplify this if you want by making the quorum a single member quorum. See the 
test in `KafkaRaftClientTest.testLeaderAppendSingleMemberQuorum()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to