Copilot commented on code in PR #2640:
URL: https://github.com/apache/fluss/pull/2640#discussion_r2791280551
##
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##
@@ -159,8 +166,43 @@ public ReadableSnapshotResult
getReadableSnapshotAndOffsets(long tieredSnapshotI
return null;
}
-// todo: optimize in #2626, if the compacted snapshot exists in zk, we
can
-// skip the follow check
+LakeSnapshot lastCompactedLakeSnapshot = null;
+
+try {
+// Attempt to retrieve the snapshot from Fluss.
+// This is a blocking call to unwrap the future.
+lastCompactedLakeSnapshot =
+flussAdmin.getLakeSnapshot(tablePath,
latestCompactedSnapshot.id()).get();
+} catch (Exception e) {
+Throwable cause = ExceptionUtils.stripExecutionException(e);
+
+// If the error is anything other than the snapshot simply not
existing,
+// we log a warning but do not interrupt the flow.
+if (!(cause instanceof LakeTableSnapshotNotExistException)) {
+LOG.warn(
+"Failed to retrieve lake snapshot {} from Fluss. "
++ "Will attempt to advance readable snapshot
as a fallback.",
+latestCompactedSnapshot.id(),
+cause);
+}
+// If LakeTableSnapshotNotExistException occurs, we silently fall
through
+// as it is an expected case when the snapshot hasn't been
recorded yet.
+}
Review Comment:
The `catch (Exception e)` around `flussAdmin.getLakeSnapshot(...).get()` can
swallow `InterruptedException` (or a wrapped interruption) without restoring
the thread’s interrupted status. In this codebase, blocking future waits
typically catch `InterruptedException`, call
`Thread.currentThread().interrupt()`, and then fail fast (or propagate). Please
handle `InterruptedException` explicitly (and optionally `ExecutionException`
separately), re-interrupt the thread, and avoid continuing with
potentially-cancelled work.
##
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##
@@ -159,8 +166,43 @@ public ReadableSnapshotResult
getReadableSnapshotAndOffsets(long tieredSnapshotI
return null;
}
-// todo: optimize in #2626, if the compacted snapshot exists in zk, we
can
-// skip the follow check
+LakeSnapshot lastCompactedLakeSnapshot = null;
+
+try {
+// Attempt to retrieve the snapshot from Fluss.
+// This is a blocking call to unwrap the future.
+lastCompactedLakeSnapshot =
+flussAdmin.getLakeSnapshot(tablePath,
latestCompactedSnapshot.id()).get();
+} catch (Exception e) {
+Throwable cause = ExceptionUtils.stripExecutionException(e);
+
+// If the error is anything other than the snapshot simply not
existing,
+// we log a warning but do not interrupt the flow.
+if (!(cause instanceof LakeTableSnapshotNotExistException)) {
+LOG.warn(
+"Failed to retrieve lake snapshot {} from Fluss. "
++ "Will attempt to advance readable snapshot
as a fallback.",
+latestCompactedSnapshot.id(),
+cause);
+}
+// If LakeTableSnapshotNotExistException occurs, we silently fall
through
+// as it is an expected case when the snapshot hasn't been
recorded yet.
+}
+
+// If we successfully retrieved a snapshot, we must validate its
integrity.
+if (lastCompactedLakeSnapshot != null) {
+// Consistency Check: The ID in Fluss must strictly match the
expected compacted ID.
+// If they differ, it indicates a critical state mismatch in the
metadata.
+checkState(
+lastCompactedLakeSnapshot.getSnapshotId() ==
latestCompactedSnapshot.id(),
+"Snapshot ID mismatch detected! Expected: %s, Actual in
Fluss: %s",
+latestCompactedSnapshot.id(),
+lastCompactedLakeSnapshot.getSnapshotId());
+
+// If the snapshot already exists and is valid, no further action
(advancing) is
+// required.
+return null;
+}
Review Comment:
Returning `null` here changes the commit path to
`LakeCommitResult.unknownReadableSnapshot(...)` (see `PaimonLakeCommitter`),
which sets `earliestSnapshotIDToKeep = KEEP_ALL_PREVIOUS (-1)`. That
effectively disables snapshot cleanup for subsequent commits and can cause
unbounded growth of lake snapshot metadata/files. If the intent is to skip
*recomputation* (and/or re-commit) w