Re: [PR] [lake/paimon] Skip readable snapshot recomputation when latest compacted snapshot already in ZK [fluss]

2026-02-11 Thread via GitHub


wuchong merged PR #2640:
URL: https://github.com/apache/fluss/pull/2640


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [lake/paimon] Skip readable snapshot recomputation when latest compacted snapshot already in ZK [fluss]

2026-02-11 Thread via GitHub


luoyuxia commented on PR #2640:
URL: https://github.com/apache/fluss/pull/2640#issuecomment-3883493270

   @wuchong I submit a pr to skip readable snapshot recomputation when the 
latest compacted snapshot is already registered in Fluss (ZK) for paimon 
deletion vector table. Could you please help review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [lake/paimon] Skip readable snapshot recomputation when latest compacted snapshot already in ZK [fluss]

2026-02-10 Thread via GitHub


Copilot commented on code in PR #2640:
URL: https://github.com/apache/fluss/pull/2640#discussion_r2791280551


##
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##
@@ -159,8 +166,43 @@ public ReadableSnapshotResult 
getReadableSnapshotAndOffsets(long tieredSnapshotI
 return null;
 }
 
-// todo: optimize in #2626, if the compacted snapshot exists in zk, we 
can
-// skip the follow check
+LakeSnapshot lastCompactedLakeSnapshot = null;
+
+try {
+// Attempt to retrieve the snapshot from Fluss.
+// This is a blocking call to unwrap the future.
+lastCompactedLakeSnapshot =
+flussAdmin.getLakeSnapshot(tablePath, 
latestCompactedSnapshot.id()).get();
+} catch (Exception e) {
+Throwable cause = ExceptionUtils.stripExecutionException(e);
+
+// If the error is anything other than the snapshot simply not 
existing,
+// we log a warning but do not interrupt the flow.
+if (!(cause instanceof LakeTableSnapshotNotExistException)) {
+LOG.warn(
+"Failed to retrieve lake snapshot {} from Fluss. "
++ "Will attempt to advance readable snapshot 
as a fallback.",
+latestCompactedSnapshot.id(),
+cause);
+}
+// If LakeTableSnapshotNotExistException occurs, we silently fall 
through
+// as it is an expected case when the snapshot hasn't been 
recorded yet.
+}

Review Comment:
   The `catch (Exception e)` around `flussAdmin.getLakeSnapshot(...).get()` can 
swallow `InterruptedException` (or a wrapped interruption) without restoring 
the thread’s interrupted status. In this codebase, blocking future waits 
typically catch `InterruptedException`, call 
`Thread.currentThread().interrupt()`, and then fail fast (or propagate). Please 
handle `InterruptedException` explicitly (and optionally `ExecutionException` 
separately), re-interrupt the thread, and avoid continuing with 
potentially-cancelled work.



##
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##
@@ -159,8 +166,43 @@ public ReadableSnapshotResult 
getReadableSnapshotAndOffsets(long tieredSnapshotI
 return null;
 }
 
-// todo: optimize in #2626, if the compacted snapshot exists in zk, we 
can
-// skip the follow check
+LakeSnapshot lastCompactedLakeSnapshot = null;
+
+try {
+// Attempt to retrieve the snapshot from Fluss.
+// This is a blocking call to unwrap the future.
+lastCompactedLakeSnapshot =
+flussAdmin.getLakeSnapshot(tablePath, 
latestCompactedSnapshot.id()).get();
+} catch (Exception e) {
+Throwable cause = ExceptionUtils.stripExecutionException(e);
+
+// If the error is anything other than the snapshot simply not 
existing,
+// we log a warning but do not interrupt the flow.
+if (!(cause instanceof LakeTableSnapshotNotExistException)) {
+LOG.warn(
+"Failed to retrieve lake snapshot {} from Fluss. "
++ "Will attempt to advance readable snapshot 
as a fallback.",
+latestCompactedSnapshot.id(),
+cause);
+}
+// If LakeTableSnapshotNotExistException occurs, we silently fall 
through
+// as it is an expected case when the snapshot hasn't been 
recorded yet.
+}
+
+// If we successfully retrieved a snapshot, we must validate its 
integrity.
+if (lastCompactedLakeSnapshot != null) {
+// Consistency Check: The ID in Fluss must strictly match the 
expected compacted ID.
+// If they differ, it indicates a critical state mismatch in the 
metadata.
+checkState(
+lastCompactedLakeSnapshot.getSnapshotId() == 
latestCompactedSnapshot.id(),
+"Snapshot ID mismatch detected! Expected: %s, Actual in 
Fluss: %s",
+latestCompactedSnapshot.id(),
+lastCompactedLakeSnapshot.getSnapshotId());
+
+// If the snapshot already exists and is valid, no further action 
(advancing) is
+// required.
+return null;
+}

Review Comment:
   Returning `null` here changes the commit path to 
`LakeCommitResult.unknownReadableSnapshot(...)` (see `PaimonLakeCommitter`), 
which sets `earliestSnapshotIDToKeep = KEEP_ALL_PREVIOUS (-1)`. That 
effectively disables snapshot cleanup for subsequent commits and can cause 
unbounded growth of lake snapshot metadata/files. If the intent is to skip 
*recomputation* (and/or re-commit) w