sashapolo commented on code in PR #3189:
URL: https://github.com/apache/ignite-3/pull/3189#discussion_r1483996166


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java:
##########
@@ -117,50 +121,47 @@ public LowWatermark(
     /**
      * Starts the watermark manager.
      */
-    public void start() {
-        inBusyLock(busyLock, () -> {
-            vaultManager.get(LOW_WATERMARK_VAULT_KEY)
-                    .thenCompose(vaultEntry -> inBusyLock(busyLock, () -> {
-                        if (vaultEntry == null) {
-                            scheduleUpdateLowWatermarkBusy();
-
-                            return nullCompletedFuture();
-                        }
-
-                        HybridTimestamp lowWatermark = 
ByteUtils.fromBytes(vaultEntry.value());
-
-                        return txManager.updateLowWatermark(lowWatermark)
-                                .thenApply(unused -> {
-                                    this.lowWatermark.set(lowWatermark);
-
-                                    
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
-
-                                    return lowWatermark;
-                                });
-                    }))
-                    .whenComplete((lowWatermark, throwable) -> {
-                        if (throwable != null) {
-                            if (!(throwable instanceof NodeStoppingException)) 
{
-                                LOG.error("Error getting low watermark", 
throwable);
+    public CompletableFuture<Void> start() {
+        return inBusyLockAsync(busyLock, () -> readLowWatermarkFromVault()
+                .thenCompose(lowWatermark -> inBusyLock(busyLock, () -> {
+                    if (lowWatermark == null) {
+                        LOG.info("Previous value of the low watermark was not 
found, will schedule to update it");
+
+                        scheduleUpdateLowWatermarkBusy();
+
+                        return nullCompletedFuture();
+                    }
+
+                    LOG.info(
+                            "Low watermark has been successfully retrieved 
from the vault and is scheduled to be updated: {}",
+                            lowWatermark
+                    );
+
+                    return txManager.updateLowWatermark(lowWatermark)
+                            .thenRun(() -> inBusyLock(busyLock, () -> {
+                                this.lowWatermark = lowWatermark;
+
+                                
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+                            }));
+                }))
+                .whenComplete((unused, throwable) -> {
+                    if (throwable != null && !(throwable instanceof 
NodeStoppingException)) {
+                        LOG.error("Error during the Watermark manager start", 
throwable);
+
+                        failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, throwable));
+
+                        inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
+                    }
+                })
+        );
+    }
 
-                                failureProcessor.process(new 
FailureContext(CRITICAL_ERROR, throwable));
+    private CompletableFuture<HybridTimestamp> readLowWatermarkFromVault() {

Review Comment:
   fixed



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java:
##########
@@ -117,50 +121,47 @@ public LowWatermark(
     /**
      * Starts the watermark manager.
      */
-    public void start() {
-        inBusyLock(busyLock, () -> {
-            vaultManager.get(LOW_WATERMARK_VAULT_KEY)
-                    .thenCompose(vaultEntry -> inBusyLock(busyLock, () -> {
-                        if (vaultEntry == null) {
-                            scheduleUpdateLowWatermarkBusy();
-
-                            return nullCompletedFuture();
-                        }
-
-                        HybridTimestamp lowWatermark = 
ByteUtils.fromBytes(vaultEntry.value());
-
-                        return txManager.updateLowWatermark(lowWatermark)
-                                .thenApply(unused -> {
-                                    this.lowWatermark.set(lowWatermark);
-
-                                    
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
-
-                                    return lowWatermark;
-                                });
-                    }))
-                    .whenComplete((lowWatermark, throwable) -> {
-                        if (throwable != null) {
-                            if (!(throwable instanceof NodeStoppingException)) 
{
-                                LOG.error("Error getting low watermark", 
throwable);
+    public CompletableFuture<Void> start() {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to