rpuch commented on code in PR #1938:
URL: https://github.com/apache/ignite-3/pull/1938#discussion_r1168664521


##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/LowWatermarkConfigurationSchema.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.configuration;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/**
+ * Low watermark configuration schema.

Review Comment:
   Do we have any additional information to put here? Like that it relates to 
the GC and that any data below the watermark should be considered as 
non-existent? Any link to an IEP or something like this?



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/LowWatermarkConfigurationSchema.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.configuration;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/**
+ * Low watermark configuration schema.
+ */
+@Config
+public class LowWatermarkConfigurationSchema {
+    /** Data availability time (in milliseconds). */

Review Comment:
   Is it true to say that LWM cannot exceed `now - dataAvailabilityTime`? If 
yes, I think it makes sense to add this invariant here to make it easier for 
the reader to understand what's being configured.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ */
+public class LowWatermarkManager implements ManuallyCloseable {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkManager.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final MvGc mvGc;
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private final AtomicReference<HybridTimestamp> lowWatermark = new 
AtomicReference<>();
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     */
+    public LowWatermarkManager(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            MvGc mvGc
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.mvGc = mvGc;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    public void start() {
+        inBusyLock(busyLock, () -> {
+            vaultManager.get(LOW_WATERMARK_VAULT_KEY)
+                    .thenApply(vaultEntry -> inBusyLock(busyLock, () -> {
+                        if (vaultEntry == null) {
+                            scheduleUpdateLowWatermarkBusy();
+
+                            return null;
+                        }
+
+                        HybridTimestamp lowWatermark = 
ByteUtils.fromBytes(vaultEntry.value());
+
+                        this.lowWatermark.set(lowWatermark);
+
+                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+
+                        return lowWatermark;
+                    }))
+                    .whenComplete((lowWatermark, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) 
{
+                                LOG.error("Error getting low watermark", 
throwable);
+
+                                // TODO: IGNITE-16899 Perhaps we need to fail 
the node by FailureHandler
+                            }
+                        } else {
+                            LOG.info(
+                                    "Low watermark has been successfully got 
from the vault and is scheduled to be updated: {}",
+                                    lowWatermark
+                            );
+                        }
+                    });
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        ScheduledFuture<?> lastScheduledTaskFuture = 
this.lastScheduledTaskFuture.get();
+
+        if (lastScheduledTaskFuture != null) {
+            lastScheduledTaskFuture.cancel(true);
+        }
+
+        IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, 
TimeUnit.SECONDS);
+    }
+
+    /**
+     * Returns the current low watermark, {@code null} means no low watermark 
has been assigned yet.
+     */
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return lowWatermark.get();
+    }
+
+    void updateLowWatermark() {
+        inBusyLock(busyLock, () -> {
+            HybridTimestamp lowWatermarkCandidate = 
createNewLowWatermarkCandidate();
+
+            
txManager.updateLowerBoundToStartNewReadOnlyTransaction(lowWatermarkCandidate);
+
+            txManager.getFutureAllReadOnlyTransactions(lowWatermarkCandidate)
+                    .thenCompose(unused -> inBusyLock(
+                            busyLock,
+                            () -> vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(lowWatermarkCandidate)))
+                    )
+                    .thenRun(() -> inBusyLock(busyLock, () -> {
+                        lowWatermark.set(lowWatermarkCandidate);
+
+                        
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermarkCandidate);
+                    }))
+                    .whenComplete((unused, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) 
{
+                                LOG.error("Failed to update low watermark, 
will schedule again: {}", throwable, lowWatermarkCandidate);
+
+                                inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
+                            }
+                        } else {
+                            LOG.info("Successful low watermark update: {}", 
lowWatermarkCandidate);
+                        }
+                    });
+        });
+    }
+
+    private void runGcAndScheduleUpdateLowWatermarkBusy(HybridTimestamp 
lowWatermark) {
+        mvGc.updateLowWatermark(lowWatermark);
+
+        scheduleUpdateLowWatermarkBusy();
+    }
+
+    private void scheduleUpdateLowWatermarkBusy() {
+        ScheduledFuture<?> previousScheduledFuture = 
this.lastScheduledTaskFuture.get();
+
+        assert previousScheduledFuture == null || 
previousScheduledFuture.isDone() : "previous scheduled task has not finished";
+
+        ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule(
+                this::updateLowWatermark,
+                lowWatermarkConfig.updateFrequency().value(),
+                TimeUnit.MILLISECONDS
+        );
+
+        boolean casResult = 
lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, 
newScheduledFuture);

Review Comment:
   Do we really need a CAS here if we know that we use a single-threaded 
executor to update it? It seems that just a volatile field would be enough if 
we drop the CAS.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ */
+public class LowWatermarkManager implements ManuallyCloseable {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkManager.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final MvGc mvGc;
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private final AtomicReference<HybridTimestamp> lowWatermark = new 
AtomicReference<>();
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     */
+    public LowWatermarkManager(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            MvGc mvGc
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.mvGc = mvGc;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    public void start() {
+        inBusyLock(busyLock, () -> {
+            vaultManager.get(LOW_WATERMARK_VAULT_KEY)
+                    .thenApply(vaultEntry -> inBusyLock(busyLock, () -> {
+                        if (vaultEntry == null) {
+                            scheduleUpdateLowWatermarkBusy();
+
+                            return null;
+                        }
+
+                        HybridTimestamp lowWatermark = 
ByteUtils.fromBytes(vaultEntry.value());
+
+                        this.lowWatermark.set(lowWatermark);
+
+                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+
+                        return lowWatermark;
+                    }))
+                    .whenComplete((lowWatermark, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) 
{
+                                LOG.error("Error getting low watermark", 
throwable);
+
+                                // TODO: IGNITE-16899 Perhaps we need to fail 
the node by FailureHandler
+                            }
+                        } else {
+                            LOG.info(
+                                    "Low watermark has been successfully got 
from the vault and is scheduled to be updated: {}",
+                                    lowWatermark
+                            );
+                        }
+                    });
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        ScheduledFuture<?> lastScheduledTaskFuture = 
this.lastScheduledTaskFuture.get();
+
+        if (lastScheduledTaskFuture != null) {
+            lastScheduledTaskFuture.cancel(true);
+        }
+
+        IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, 
TimeUnit.SECONDS);
+    }
+
+    /**
+     * Returns the current low watermark, {@code null} means no low watermark 
has been assigned yet.
+     */
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return lowWatermark.get();
+    }
+
+    void updateLowWatermark() {
+        inBusyLock(busyLock, () -> {
+            HybridTimestamp lowWatermarkCandidate = 
createNewLowWatermarkCandidate();
+
+            
txManager.updateLowerBoundToStartNewReadOnlyTransaction(lowWatermarkCandidate);
+
+            txManager.getFutureAllReadOnlyTransactions(lowWatermarkCandidate)
+                    .thenCompose(unused -> inBusyLock(
+                            busyLock,
+                            () -> vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(lowWatermarkCandidate)))
+                    )
+                    .thenRun(() -> inBusyLock(busyLock, () -> {
+                        lowWatermark.set(lowWatermarkCandidate);
+
+                        
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermarkCandidate);
+                    }))
+                    .whenComplete((unused, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) 
{
+                                LOG.error("Failed to update low watermark, 
will schedule again: {}", throwable, lowWatermarkCandidate);
+
+                                inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
+                            }
+                        } else {
+                            LOG.info("Successful low watermark update: {}", 
lowWatermarkCandidate);
+                        }
+                    });
+        });
+    }
+
+    private void runGcAndScheduleUpdateLowWatermarkBusy(HybridTimestamp 
lowWatermark) {
+        mvGc.updateLowWatermark(lowWatermark);
+
+        scheduleUpdateLowWatermarkBusy();
+    }
+
+    private void scheduleUpdateLowWatermarkBusy() {
+        ScheduledFuture<?> previousScheduledFuture = 
this.lastScheduledTaskFuture.get();
+
+        assert previousScheduledFuture == null || 
previousScheduledFuture.isDone() : "previous scheduled task has not finished";
+
+        ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule(
+                this::updateLowWatermark,
+                lowWatermarkConfig.updateFrequency().value(),
+                TimeUnit.MILLISECONDS
+        );
+
+        boolean casResult = 
lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, 
newScheduledFuture);
+
+        assert casResult : "only one scheduled task is expected";
+    }
+
+    HybridTimestamp createNewLowWatermarkCandidate() {
+        HybridTimestamp now = clock.now();
+
+        long newPhysicalTime = now.getPhysical() - 
lowWatermarkConfig.dataAvailabilityTime().value() - getMaxClockSkew();
+
+        HybridTimestamp lowWatermarkCandidate = new 
HybridTimestamp(newPhysicalTime, now.getLogical());

Review Comment:
   How about adding a method for addition of physical time (like 
`addPhysicalTime(long millis)`) to the `HybridTimestamp` class? It will be 
useful for schema sync as well.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java:
##########
@@ -277,12 +277,19 @@ public int finished() {
 
         @Override
         public void start() {
+        }
 
+        @Override
+        public void stop() {
         }
 
         @Override
-        public void stop() throws Exception {
+        public void updateLowerBoundToStartNewReadOnlyTransaction(@Nullable 
HybridTimestamp lowerBound) {
+        }
 
+        @Override
+        public CompletableFuture<Void> 
getFutureAllReadOnlyTransactions(HybridTimestamp timestamp) {
+            return null;

Review Comment:
   Let's return `completedFuture(null)` as `null` breaks the contract of the 
method. It's ok now, but in the future something might change and the method 
might be called even in tests.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java:
##########
@@ -124,4 +128,20 @@ CompletableFuture<Void> cleanup(
      */
     @TestOnly
     int finished();
+
+    /**
+     * Updates the lower bound (exclusive) of the timestamp to start new 
read-only transactions.
+     *
+     * <p>All new read-only transactions will have to start strictly greater 
this lower bound.
+     *
+     * @param lowerBound New lower bound, {@code null} if there is no lower 
bound.
+     */
+    void updateLowerBoundToStartNewReadOnlyTransaction(@Nullable 
HybridTimestamp lowerBound);

Review Comment:
   Why does parameter need to be nullable? `LowWatermarkManager` always passes 
a non-null value here.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java:
##########
@@ -124,4 +128,20 @@ CompletableFuture<Void> cleanup(
      */
     @TestOnly
     int finished();
+
+    /**
+     * Updates the lower bound (exclusive) of the timestamp to start new 
read-only transactions.
+     *
+     * <p>All new read-only transactions will have to start strictly greater 
this lower bound.
+     *
+     * @param lowerBound New lower bound, {@code null} if there is no lower 
bound.
+     */
+    void updateLowerBoundToStartNewReadOnlyTransaction(@Nullable 
HybridTimestamp lowerBound);
+
+    /**
+     * Returns the future of all read-only transactions up to the timestamp.
+     *
+     * @param timestamp Timestamp.
+     */
+    CompletableFuture<Void> getFutureAllReadOnlyTransactions(HybridTimestamp 
timestamp);

Review Comment:
   I suggest to change the name so that it makes clear that we are waiting till 
all RO transactions with timestamps earlier than or equal to the given 
timestamp finish. Like `roTxnsStartedBeforeLowWatermark()` or something better.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ */
+public class LowWatermarkManager implements ManuallyCloseable {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkManager.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final MvGc mvGc;
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private final AtomicReference<HybridTimestamp> lowWatermark = new 
AtomicReference<>();
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     */
+    public LowWatermarkManager(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            MvGc mvGc
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.mvGc = mvGc;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    public void start() {
+        inBusyLock(busyLock, () -> {
+            vaultManager.get(LOW_WATERMARK_VAULT_KEY)
+                    .thenApply(vaultEntry -> inBusyLock(busyLock, () -> {
+                        if (vaultEntry == null) {
+                            scheduleUpdateLowWatermarkBusy();
+
+                            return null;
+                        }
+
+                        HybridTimestamp lowWatermark = 
ByteUtils.fromBytes(vaultEntry.value());
+
+                        this.lowWatermark.set(lowWatermark);
+
+                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+
+                        return lowWatermark;
+                    }))
+                    .whenComplete((lowWatermark, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) 
{
+                                LOG.error("Error getting low watermark", 
throwable);
+
+                                // TODO: IGNITE-16899 Perhaps we need to fail 
the node by FailureHandler
+                            }
+                        } else {
+                            LOG.info(
+                                    "Low watermark has been successfully got 
from the vault and is scheduled to be updated: {}",
+                                    lowWatermark
+                            );
+                        }
+                    });
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        ScheduledFuture<?> lastScheduledTaskFuture = 
this.lastScheduledTaskFuture.get();
+
+        if (lastScheduledTaskFuture != null) {
+            lastScheduledTaskFuture.cancel(true);
+        }
+
+        IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, 
TimeUnit.SECONDS);
+    }
+
+    /**
+     * Returns the current low watermark, {@code null} means no low watermark 
has been assigned yet.
+     */
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return lowWatermark.get();
+    }
+
+    void updateLowWatermark() {
+        inBusyLock(busyLock, () -> {

Review Comment:
   It seems to make sense to create an instance method in this class, named 
`inBusyLock`, accepting just one argument: the closure, and delegating to 
`IgniteUtils.inBusyLock(busyLock, closure)`. This would allow to write 
`inBusyLock(() -> blabla)` instead of `inBusyLock(busyLock, () -> blabla)`, 
avoiding the ceremony of mentioning that `busyLock` variable again and again. 
After all, there is just one busy lock per instance.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java:
##########
@@ -17,85 +17,84 @@
 
 package org.apache.ignite.internal.tx.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.network.ClusterNode;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * The read-only implementation of an internal transaction.
  */
-public class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl {
+class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl {
     /** The read timestamp. */
     private final HybridTimestamp readTimestamp;
 
+    /** Prevents double finish of the transaction. */
+    private final AtomicBoolean finishGuard = new AtomicBoolean();
+
     /**
      * The constructor.
      *
      * @param txManager The tx manager.
      * @param id The id.
      * @param readTimestamp The read timestamp.
      */
-    public ReadOnlyTransactionImpl(
-            TxManager txManager,
-            @NotNull UUID id,
-            HybridTimestamp readTimestamp
-    ) {
+    ReadOnlyTransactionImpl(TxManagerImpl txManager, UUID id, HybridTimestamp 
readTimestamp) {
         super(txManager, id);
+
         this.readTimestamp = readTimestamp;
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean isReadOnly() {
         return true;
     }
 
-    /** {@inheritDoc} */
     @Override
     public HybridTimestamp readTimestamp() {
         return readTimestamp;
     }
 
-    /** {@inheritDoc} */
     @Override
     public IgniteBiTuple<ClusterNode, Long> enlist(ReplicationGroupId 
replicationGroupId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
         // TODO: IGNITE-17666 Close cursor tx finish and do it on the first 
finish invocation only.
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public IgniteBiTuple<ClusterNode, Long> 
enlistedNodeAndTerm(ReplicationGroupId replicationGroupId) {
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean assignCommitPartition(ReplicationGroupId 
replicationGroupId) {
         return true;
     }
 
-    /** {@inheritDoc} */
     @Override
     public ReplicationGroupId commitPartition() {
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void enlistResultFuture(CompletableFuture<?> resultFuture) {
         // No-op.
     }
 
-    /** {@inheritDoc} */
     @Override
+    // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish 
invocation only.
     protected CompletableFuture<Void> finish(boolean commit) {
-        // TODO: IGNITE-17666 Close cursor tx finish and do it on the first 
finish invocation only.
-        return CompletableFuture.completedFuture(null);
+        if (!finishGuard.compareAndSet(false, true)) {
+            return completedFuture(null);
+        }
+
+        ((TxManagerImpl) 
txManager).completeReadOnlyTransactionFuture(readTimestamp);

Review Comment:
   When users will be given a possibility to explicitly specify a read TS for 
an RO TX, it will be possible to have more than 1 RO TX with same read TS, so 
it doesn't seem enough to just pass a TS.



##########
modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java:
##########
@@ -108,4 +125,49 @@ public void testId() throws InterruptedException {
         assertTrue(txId3.compareTo(txId2) > 0);
         assertTrue(txId4.compareTo(txId3) > 0);
     }
+
+    @Test
+    void testUpdateLowerBoundToStartNewReadOnlyTransaction() {
+        when(clock.now()).thenReturn(new HybridTimestamp(10, 10));
+
+        txManager.updateLowerBoundToStartNewReadOnlyTransaction(new 
HybridTimestamp(10, 11));
+
+        IgniteInternalException exception = 
assertThrows(IgniteInternalException.class, () -> txManager.begin(true));
+
+        assertEquals(Transactions.TX_READ_ONLY_CREATING_ERR, exception.code());
+
+        // Let's check the removed lower bound.
+        txManager.updateLowerBoundToStartNewReadOnlyTransaction(null);
+
+        assertDoesNotThrow(() -> txManager.begin(true));
+    }
+
+    @Test
+    void testGetFutureReadOnlyTransactions() {
+        // Let's check the absence of transactions.
+        assertThat(txManager.getFutureAllReadOnlyTransactions(clock.now()), 
willSucceedFast());
+
+        InternalTransaction rwTx0 = txManager.begin(false);
+
+        InternalTransaction roTx0 = txManager.begin(true);
+        InternalTransaction roTx1 = txManager.begin(true);
+
+        CompletableFuture<Void> readOnlyTxsFutures = 
txManager.getFutureAllReadOnlyTransactions(roTx1.readTimestamp());

Review Comment:
   ```suggestion
           CompletableFuture<Void> readOnlyTxsFuture = 
txManager.getFutureAllReadOnlyTransactions(roTx1.readTimestamp());
   ```



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -74,27 +86,45 @@ public TxManagerImpl(ReplicaService replicaService, 
LockManager lockManager, Hyb
         this.clock = clock;
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin() {
         return begin(false);
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin(boolean readOnly) {
         UUID txId = Timestamp.nextVersion().toUuid();
 
-        return readOnly ? new ReadOnlyTransactionImpl(this, txId, clock.now()) 
: new ReadWriteTransactionImpl(this, txId);
+        if (!readOnly) {
+            return new ReadWriteTransactionImpl(this, txId);
+        }
+
+        HybridTimestamp readTimestamp = clock.now();
+
+        readOnlyTxFutureByReadTs.compute(readTimestamp, (timestamp, 
readOnlyTxFuture) -> {
+            assert readOnlyTxFuture == null : "previous transaction has not 
completed yet: " + readTimestamp;
+
+            HybridTimestamp lowerBound = 
lowerBoundTsToStartNewReadOnlyTx.get();
+
+            if (lowerBound != null && readTimestamp.compareTo(lowerBound) <= 
0) {
+                throw new IgniteInternalException(
+                        TX_READ_ONLY_CREATING_ERR,
+                        "Timestamp read-only transaction must be greater than 
the lower bound: [txTimestamp={}, lowerBound={}]",
+                        readTimestamp, lowerBound
+                );

Review Comment:
   Consider the following scenario:
   
   1. One thread (A) starts an RO TX, it is inside 
`readOnlyTxFutureByReadTs.compute()`, it already took the old lower bound value 
and validated the read TS (T1) successfully, but did not return from the 
closure yet
   2. Another thread (B) updates the lower bound to a higher value, with which 
T1 is not valid anymore
   3. Then A finishes the addition of the new RO TX, which is too old
   
   Another scenario is that addition of a transaction with TS T1 begins in 
thread A, then thread B takes `getFutureAllReadOnlyTransactions()` for T2>T1 
(and does not see that transaction for T1 yet), and then the addition of the 
transaction completes. The transaction might take longer than the transactions 
that were seen by `getFutureAllReadOnlyTransactions()`, so the result of 
`getFutureAllReadOnlyTransactions()` might complete before the transaction at 
T1 completes, which seems to break the contract.
   
   Probably we need a synchronization coordinating updates of both the future 
map and the lower bound (and also the reads of the map that are used to build 
the compound future).



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ */
+public class LowWatermarkManager implements ManuallyCloseable {

Review Comment:
   Our 'managers' are usually `IgniteComponent`s, not `ManuallyCloseable`s. Is 
there a specific reason to prefer the latter here?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ */
+public class LowWatermarkManager implements ManuallyCloseable {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkManager.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final MvGc mvGc;
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private final AtomicReference<HybridTimestamp> lowWatermark = new 
AtomicReference<>();
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     */
+    public LowWatermarkManager(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            MvGc mvGc
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.mvGc = mvGc;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    public void start() {
+        inBusyLock(busyLock, () -> {
+            vaultManager.get(LOW_WATERMARK_VAULT_KEY)
+                    .thenApply(vaultEntry -> inBusyLock(busyLock, () -> {
+                        if (vaultEntry == null) {
+                            scheduleUpdateLowWatermarkBusy();
+
+                            return null;
+                        }
+
+                        HybridTimestamp lowWatermark = 
ByteUtils.fromBytes(vaultEntry.value());
+
+                        this.lowWatermark.set(lowWatermark);
+
+                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+
+                        return lowWatermark;
+                    }))
+                    .whenComplete((lowWatermark, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) 
{
+                                LOG.error("Error getting low watermark", 
throwable);
+
+                                // TODO: IGNITE-16899 Perhaps we need to fail 
the node by FailureHandler
+                            }
+                        } else {
+                            LOG.info(
+                                    "Low watermark has been successfully got 
from the vault and is scheduled to be updated: {}",
+                                    lowWatermark
+                            );
+                        }
+                    });
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        ScheduledFuture<?> lastScheduledTaskFuture = 
this.lastScheduledTaskFuture.get();
+
+        if (lastScheduledTaskFuture != null) {
+            lastScheduledTaskFuture.cancel(true);
+        }
+
+        IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, 
TimeUnit.SECONDS);
+    }
+
+    /**
+     * Returns the current low watermark, {@code null} means no low watermark 
has been assigned yet.
+     */
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return lowWatermark.get();
+    }
+
+    void updateLowWatermark() {
+        inBusyLock(busyLock, () -> {
+            HybridTimestamp lowWatermarkCandidate = 
createNewLowWatermarkCandidate();
+
+            
txManager.updateLowerBoundToStartNewReadOnlyTransaction(lowWatermarkCandidate);
+
+            txManager.getFutureAllReadOnlyTransactions(lowWatermarkCandidate)
+                    .thenCompose(unused -> inBusyLock(

Review Comment:
   How about adding a comment that now the candidate is being promoted as a new 
LWM? Otherwise, it's a bit puzzling why a candidate is written to the Vault as 
a new LWM.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ */
+public class LowWatermarkManager implements ManuallyCloseable {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LowWatermarkManager.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new 
ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final MvGc mvGc;
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private final AtomicReference<HybridTimestamp> lowWatermark = new 
AtomicReference<>();
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture 
= new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     */
+    public LowWatermarkManager(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            MvGc mvGc
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.mvGc = mvGc;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    public void start() {
+        inBusyLock(busyLock, () -> {
+            vaultManager.get(LOW_WATERMARK_VAULT_KEY)
+                    .thenApply(vaultEntry -> inBusyLock(busyLock, () -> {
+                        if (vaultEntry == null) {
+                            scheduleUpdateLowWatermarkBusy();
+
+                            return null;
+                        }
+
+                        HybridTimestamp lowWatermark = 
ByteUtils.fromBytes(vaultEntry.value());
+
+                        this.lowWatermark.set(lowWatermark);
+
+                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+
+                        return lowWatermark;
+                    }))
+                    .whenComplete((lowWatermark, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) 
{
+                                LOG.error("Error getting low watermark", 
throwable);
+
+                                // TODO: IGNITE-16899 Perhaps we need to fail 
the node by FailureHandler

Review Comment:
   As we don't fail the node yet, should we try to schedule another attempt to 
update a watermark? It would be puzzling if something fails in tests during 
start, and then nothing gets updated.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java:
##########
@@ -124,4 +128,20 @@ CompletableFuture<Void> cleanup(
      */
     @TestOnly
     int finished();
+
+    /**
+     * Updates the lower bound (exclusive) of the timestamp to start new 
read-only transactions.
+     *
+     * <p>All new read-only transactions will have to start strictly greater 
this lower bound.

Review Comment:
   ```suggestion
        * <p>All new read-only transactions will have to start with timestamps 
strictly later than this lower bound.
   ```



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -74,27 +86,45 @@ public TxManagerImpl(ReplicaService replicaService, 
LockManager lockManager, Hyb
         this.clock = clock;
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin() {
         return begin(false);
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin(boolean readOnly) {
         UUID txId = Timestamp.nextVersion().toUuid();
 
-        return readOnly ? new ReadOnlyTransactionImpl(this, txId, clock.now()) 
: new ReadWriteTransactionImpl(this, txId);
+        if (!readOnly) {
+            return new ReadWriteTransactionImpl(this, txId);
+        }
+
+        HybridTimestamp readTimestamp = clock.now();
+
+        readOnlyTxFutureByReadTs.compute(readTimestamp, (timestamp, 
readOnlyTxFuture) -> {
+            assert readOnlyTxFuture == null : "previous transaction has not 
completed yet: " + readTimestamp;
+
+            HybridTimestamp lowerBound = 
lowerBoundTsToStartNewReadOnlyTx.get();
+
+            if (lowerBound != null && readTimestamp.compareTo(lowerBound) <= 
0) {
+                throw new IgniteInternalException(
+                        TX_READ_ONLY_CREATING_ERR,

Review Comment:
   Do we need such a general error cause ('some error while creating an RO 
transaction'? Won't it make sense to reserve a specific error code for this 
specific error: 'RO read tx is too old'?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java:
##########
@@ -124,4 +128,20 @@ CompletableFuture<Void> cleanup(
      */
     @TestOnly
     int finished();
+
+    /**
+     * Updates the lower bound (exclusive) of the timestamp to start new 
read-only transactions.
+     *
+     * <p>All new read-only transactions will have to start strictly greater 
this lower bound.
+     *
+     * @param lowerBound New lower bound, {@code null} if there is no lower 
bound.
+     */
+    void updateLowerBoundToStartNewReadOnlyTransaction(@Nullable 
HybridTimestamp lowerBound);

Review Comment:
   How about a different name, like `forbidReadOnlyTransactionsNotLaterThan()`? 
Just a stylistic matter.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkManagerTest.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.table.distributed.LowWatermarkManager.LOW_WATERMARK_VAULT_KEY;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InOrder;
+
+/**
+ * For {@link LowWatermarkManager} testing.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class LowWatermarkManagerTest {
+    @InjectConfiguration
+    private LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock = spy(new HybridClockImpl());
+
+    private final TxManager txManager = mock(TxManager.class);
+
+    private final VaultManager vaultManager = mock(VaultManager.class);
+
+    private final MvGc mvGc = mock(MvGc.class);
+
+    private LowWatermarkManager lowWatermarkManager;
+
+    @BeforeEach
+    void setUp() {
+        lowWatermarkManager = new LowWatermarkManager("test", 
lowWatermarkConfig, clock, txManager, vaultManager, mvGc);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        lowWatermarkManager.close();
+    }
+
+    @Test
+    void testStart() {
+        // Let's check the start with no low watermark in vault.
+        
when(vaultManager.get(LOW_WATERMARK_VAULT_KEY)).thenReturn(completedFuture(null));
+
+        lowWatermarkManager.start();
+
+        verify(mvGc, never()).updateLowWatermark(any(HybridTimestamp.class));
+        assertNull(lowWatermarkManager.getLowWatermark());
+
+        // Let's check the start with an existing low watermark in vault.
+        HybridTimestamp lowWatermark = new HybridTimestamp(10, 10);
+
+        when(vaultManager.get(LOW_WATERMARK_VAULT_KEY))
+                .thenReturn(completedFuture(new 
VaultEntry(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(lowWatermark))));
+
+        lowWatermarkManager.start();
+
+        verify(mvGc).updateLowWatermark(lowWatermark);
+        assertEquals(lowWatermark, lowWatermarkManager.getLowWatermark());
+    }
+
+    @Test
+    void testCreateNewLowWatermarkCandidate() {
+        when(clock.now()).thenReturn(new HybridTimestamp(1_000, 500));
+
+        assertThat(lowWatermarkConfig.dataAvailabilityTime().update(100L), 
willSucceedFast());
+
+        HybridTimestamp newLowWatermarkCandidate = 
lowWatermarkManager.createNewLowWatermarkCandidate();
+
+        assertThat(newLowWatermarkCandidate.getPhysical(), 
lessThanOrEqualTo(1_000L - 100));
+        assertEquals(500L, newLowWatermarkCandidate.getLogical());
+    }
+
+    @Test
+    void testUpdateLowWatermark() {
+        HybridTimestamp now = clock.now();
+
+        when(clock.now()).thenReturn(now);
+
+        
when(txManager.getFutureAllReadOnlyTransactions(any(HybridTimestamp.class))).thenReturn(completedFuture(null));
+
+        when(vaultManager.put(any(ByteArray.class), 
any(byte[].class))).thenReturn(completedFuture(null));
+
+        // Made a predictable candidate to make it easier to test.

Review Comment:
   ```suggestion
           // Make a predictable candidate to make it easier to test.
   ```



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -74,27 +86,45 @@ public TxManagerImpl(ReplicaService replicaService, 
LockManager lockManager, Hyb
         this.clock = clock;
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin() {
         return begin(false);
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin(boolean readOnly) {
         UUID txId = Timestamp.nextVersion().toUuid();
 
-        return readOnly ? new ReadOnlyTransactionImpl(this, txId, clock.now()) 
: new ReadWriteTransactionImpl(this, txId);
+        if (!readOnly) {
+            return new ReadWriteTransactionImpl(this, txId);
+        }
+
+        HybridTimestamp readTimestamp = clock.now();
+
+        readOnlyTxFutureByReadTs.compute(readTimestamp, (timestamp, 
readOnlyTxFuture) -> {

Review Comment:
   Probably this is not yet implemented, but I think it will be possible to the 
user to explicitly specify the read TS they want for an RO transaction. This 
probably  means that read TS should not be used to identify a transaction: a 
whole collection of TXs might correspond to a given read TS.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java:
##########
@@ -17,85 +17,84 @@
 
 package org.apache.ignite.internal.tx.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.network.ClusterNode;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * The read-only implementation of an internal transaction.
  */
-public class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl {
+class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl {
     /** The read timestamp. */
     private final HybridTimestamp readTimestamp;
 
+    /** Prevents double finish of the transaction. */
+    private final AtomicBoolean finishGuard = new AtomicBoolean();
+
     /**
      * The constructor.
      *
      * @param txManager The tx manager.
      * @param id The id.
      * @param readTimestamp The read timestamp.
      */
-    public ReadOnlyTransactionImpl(
-            TxManager txManager,
-            @NotNull UUID id,
-            HybridTimestamp readTimestamp
-    ) {
+    ReadOnlyTransactionImpl(TxManagerImpl txManager, UUID id, HybridTimestamp 
readTimestamp) {
         super(txManager, id);
+
         this.readTimestamp = readTimestamp;
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean isReadOnly() {
         return true;
     }
 
-    /** {@inheritDoc} */
     @Override
     public HybridTimestamp readTimestamp() {
         return readTimestamp;
     }
 
-    /** {@inheritDoc} */
     @Override
     public IgniteBiTuple<ClusterNode, Long> enlist(ReplicationGroupId 
replicationGroupId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
         // TODO: IGNITE-17666 Close cursor tx finish and do it on the first 
finish invocation only.
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public IgniteBiTuple<ClusterNode, Long> 
enlistedNodeAndTerm(ReplicationGroupId replicationGroupId) {
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean assignCommitPartition(ReplicationGroupId 
replicationGroupId) {
         return true;
     }
 
-    /** {@inheritDoc} */
     @Override
     public ReplicationGroupId commitPartition() {
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void enlistResultFuture(CompletableFuture<?> resultFuture) {
         // No-op.
     }
 
-    /** {@inheritDoc} */
     @Override
+    // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish 
invocation only.
     protected CompletableFuture<Void> finish(boolean commit) {
-        // TODO: IGNITE-17666 Close cursor tx finish and do it on the first 
finish invocation only.
-        return CompletableFuture.completedFuture(null);
+        if (!finishGuard.compareAndSet(false, true)) {
+            return completedFuture(null);
+        }
+
+        ((TxManagerImpl) 
txManager).completeReadOnlyTransactionFuture(readTimestamp);
+
+        return completedFuture(null);

Review Comment:
   Should we return the corresponding future from `TxManagerImpl` (that gets 
completed by `completeReadOnlyTransactionFuture()`, or it's unrelated?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -74,27 +86,45 @@ public TxManagerImpl(ReplicaService replicaService, 
LockManager lockManager, Hyb
         this.clock = clock;
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin() {
         return begin(false);
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin(boolean readOnly) {
         UUID txId = Timestamp.nextVersion().toUuid();
 
-        return readOnly ? new ReadOnlyTransactionImpl(this, txId, clock.now()) 
: new ReadWriteTransactionImpl(this, txId);
+        if (!readOnly) {
+            return new ReadWriteTransactionImpl(this, txId);
+        }
+
+        HybridTimestamp readTimestamp = clock.now();
+
+        readOnlyTxFutureByReadTs.compute(readTimestamp, (timestamp, 
readOnlyTxFuture) -> {
+            assert readOnlyTxFuture == null : "previous transaction has not 
completed yet: " + readTimestamp;

Review Comment:
   In the current implementation, each tx seems to get a distinct read 
timestamp, so why the check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to