sashapolo commented on code in PR #3189:
URL: https://github.com/apache/ignite-3/pull/3189#discussion_r1483990046
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java:
##########
@@ -117,50 +121,47 @@ public LowWatermark(
/**
* Starts the watermark manager.
*/
- public void start() {
- inBusyLock(busyLock, () -> {
- vaultManager.get(LOW_WATERMARK_VAULT_KEY)
- .thenCompose(vaultEntry -> inBusyLock(busyLock, () -> {
- if (vaultEntry == null) {
- scheduleUpdateLowWatermarkBusy();
-
- return nullCompletedFuture();
- }
-
- HybridTimestamp lowWatermark =
ByteUtils.fromBytes(vaultEntry.value());
-
- return txManager.updateLowWatermark(lowWatermark)
- .thenApply(unused -> {
- this.lowWatermark.set(lowWatermark);
-
-
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
-
- return lowWatermark;
- });
- }))
- .whenComplete((lowWatermark, throwable) -> {
- if (throwable != null) {
- if (!(throwable instanceof NodeStoppingException))
{
- LOG.error("Error getting low watermark",
throwable);
+ public CompletableFuture<Void> start() {
+ return inBusyLockAsync(busyLock, () -> readLowWatermarkFromVault()
+ .thenCompose(lowWatermark -> inBusyLock(busyLock, () -> {
+ if (lowWatermark == null) {
+ LOG.info("Previous value of the low watermark was not
found, will schedule to update it");
+
+ scheduleUpdateLowWatermarkBusy();
+
+ return nullCompletedFuture();
+ }
+
+ LOG.info(
+ "Low watermark has been successfully retrieved
from the vault and is scheduled to be updated: {}",
+ lowWatermark
+ );
+
+ return txManager.updateLowWatermark(lowWatermark)
+ .thenRun(() -> inBusyLock(busyLock, () -> {
+ this.lowWatermark = lowWatermark;
+
+
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+ }));
+ }))
+ .whenComplete((unused, throwable) -> {
+ if (throwable != null && !(throwable instanceof
NodeStoppingException)) {
+ LOG.error("Error during the Watermark manager start",
throwable);
+
+ failureProcessor.process(new
FailureContext(CRITICAL_ERROR, throwable));
+
+ inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
+ }
+ })
+ );
+ }
- failureProcessor.process(new
FailureContext(CRITICAL_ERROR, throwable));
+ private CompletableFuture<HybridTimestamp> readLowWatermarkFromVault() {
Review Comment:
There was an async call before, so I left it as-is. But seems like we can
get away with a fully sync start, so I'll rewrite it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]