This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a59af8a10 IGNITE-20668 Increase wait after a DDL to account for idle 
safe-time propagation period (#2703)
0a59af8a10 is described below

commit 0a59af8a100e1e8e05e541c701aaf8da3ab818f2
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Wed Oct 18 13:17:24 2023 +0400

    IGNITE-20668 Increase wait after a DDL to account for idle safe-time 
propagation period (#2703)
---
 .../internal/catalog/CatalogManagerImpl.java       | 24 ++++--
 .../internal/catalog/CatalogManagerSelfTest.java   |  4 +-
 .../java/org/apache/ignite/internal/Hacks.java     | 27 +++++++
 .../testframework/BaseIgniteAbstractTest.java      |  2 +
 .../ignite/internal/replicator/ReplicaManager.java | 56 +++++++++++++-
 .../apache/ignite/internal/jdbc/ItJdbcTest.java    |  2 +-
 .../internal/readonly/ItReadOnlyTxInPastTest.java  | 86 ++++++++++++++++++++++
 .../rebalance/ItRebalanceDistributedTest.java      | 11 ++-
 .../rebalance/ItRebalanceRecoveryTest.java         |  3 +
 .../runner/app/ItIgniteNodeRestartTest.java        | 11 ++-
 .../ignite/internal/sql/api/ItSqlApiBaseTest.java  |  9 ++-
 .../org/apache/ignite/internal/app/IgniteImpl.java | 16 +++-
 .../java/org/apache/ignite/internal/Cluster.java   | 10 +++
 .../LowWatermarkConfigurationSchema.java           |  4 +-
 .../ignite/internal/tx/impl/TxManagerImpl.java     | 49 ++++++++++--
 .../apache/ignite/internal/tx/TxManagerTest.java   | 21 +++---
 16 files changed, 291 insertions(+), 44 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index aab4f0f777..9a6ead5252 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -90,6 +90,8 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
     /** Safe time to wait before new Catalog version activation. */
     private static final int DEFAULT_DELAY_DURATION = 0;
 
+    private static final int 
DEFAULT_PARTITION_IDLE_SAFE_TIME_PROPAGATION_PERIOD = 0;
+
     /** Initial update token for a catalog descriptor, this token is valid 
only before the first call of
      * {@link UpdateEntry#applyUpdate(Catalog, long)}.
      *
@@ -115,27 +117,35 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
 
     private final LongSupplier delayDurationMsSupplier;
 
+    private final LongSupplier 
partitionIdleSafeTimePropagationPeriodMsSupplier;
+
     /**
      * Constructor.
      */
     public CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter) {
-        this(updateLog, clockWaiter, DEFAULT_DELAY_DURATION);
+        this(updateLog, clockWaiter, DEFAULT_DELAY_DURATION, 
DEFAULT_PARTITION_IDLE_SAFE_TIME_PROPAGATION_PERIOD);
     }
 
     /**
      * Constructor.
      */
-    CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long 
delayDurationMs) {
-        this(updateLog, clockWaiter, () -> delayDurationMs);
+    CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long 
delayDurationMs, long partitionIdleSafeTimePropagationPeriod) {
+        this(updateLog, clockWaiter, () -> delayDurationMs, () -> 
partitionIdleSafeTimePropagationPeriod);
     }
 
     /**
      * Constructor.
      */
-    public CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter, 
LongSupplier delayDurationMsSupplier) {
+    public CatalogManagerImpl(
+            UpdateLog updateLog,
+            ClockWaiter clockWaiter,
+            LongSupplier delayDurationMsSupplier,
+            LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier
+    ) {
         this.updateLog = updateLog;
         this.clockWaiter = clockWaiter;
         this.delayDurationMsSupplier = delayDurationMsSupplier;
+        this.partitionIdleSafeTimePropagationPeriodMsSupplier = 
partitionIdleSafeTimePropagationPeriodMsSupplier;
     }
 
     @Override
@@ -415,8 +425,12 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
                     HybridTimestamp clusterWideEnsuredActivationTs = 
activationTs.addPhysicalTime(
                             HybridTimestamp.maxClockSkew()
                     );
+                    // TODO: this addition has to be removed when IGNITE-20378 
is implemented.
+                    HybridTimestamp tsSafeForRoReadingInPastOptimization = 
clusterWideEnsuredActivationTs.addPhysicalTime(
+                            
partitionIdleSafeTimePropagationPeriodMsSupplier.getAsLong()
+                    );
 
-                    return clockWaiter.waitFor(clusterWideEnsuredActivationTs);
+                    return 
clockWaiter.waitFor(tsSafeForRoReadingInPastOptimization);
                 });
     }
 
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index 01f78d5125..fbaae7dbfb 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -1046,7 +1046,7 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
     public void catalogActivationTime() throws Exception {
         long delayDuration = TimeUnit.DAYS.toMillis(365);
 
-        CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, 
clockWaiter, delayDuration);
+        CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, 
clockWaiter, delayDuration, 0);
 
         manager.start();
 
@@ -1444,7 +1444,7 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
 
         HybridTimestamp startTs = clock.now();
 
-        CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, 
clockWaiter, delayDuration);
+        CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, 
clockWaiter, delayDuration, 0);
 
         manager.start();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/Hacks.java 
b/modules/core/src/main/java/org/apache/ignite/internal/Hacks.java
new file mode 100644
index 0000000000..b9cbdc1498
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/Hacks.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+/**
+ * Contains hacks needed for the whole codebase. Should be removed as quickly 
as possible.
+ */
+public class Hacks {
+    // TODO: Remove after IGNITE-20499 is fixed.
+    /** Name of the property overriding idle safe time propagation period (in 
milliseconds). */
+    public static final String 
IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY = 
"IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS";
+}
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
index 8d3338cc9a..c811c4d11d 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.testframework;
 
+import static 
org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.IGNITE_SENSITIVE_DATA_LOGGING;
 import static org.apache.ignite.internal.lang.IgniteSystemProperties.getString;
 import static org.apache.ignite.internal.util.IgniteUtils.monotonicMs;
@@ -38,6 +39,7 @@ import org.mockito.Mockito;
  * Ignite base test class.
  */
 @ExtendWith(SystemPropertiesExtension.class)
+@WithSystemProperty(key = 
IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "200")
 public abstract class BaseIgniteAbstractTest {
     /** Logger. */
     protected final IgniteLogger log = Loggers.forClass(getClass());
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 55b03f36e7..eb525e7f1a 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.replicator;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
 import java.io.IOException;
@@ -36,6 +37,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -80,8 +82,8 @@ import org.jetbrains.annotations.TestOnly;
  * This class allow to start/stop/get a replica.
  */
 public class ReplicaManager implements IgniteComponent {
-    /** Idle safe time propagation period. */
-    public static final int IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS = 
1000;
+    /** Default Idle safe time propagation period for tests. */
+    public static final int 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS = 1000;
 
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(ReplicaManager.class);
@@ -115,6 +117,8 @@ public class ReplicaManager implements IgniteComponent {
     /** Placement driver. */
     private final PlacementDriver placementDriver;
 
+    private final LongSupplier idleSafeTimePropagationPeriodMsSupplier;
+
     /** Replicas. */
     private final ConcurrentHashMap<ReplicationGroupId, 
CompletableFuture<Replica>> replicas = new ConcurrentHashMap<>();
 
@@ -141,7 +145,9 @@ public class ReplicaManager implements IgniteComponent {
      * @param cmgMgr Cluster group manager.
      * @param clock A hybrid logical clock.
      * @param messageGroupsToHandle Message handlers.
+     * @param placementDriver A placement driver.
      */
+    @TestOnly
     public ReplicaManager(
             String nodeName,
             ClusterService clusterNetSvc,
@@ -149,6 +155,37 @@ public class ReplicaManager implements IgniteComponent {
             HybridClock clock,
             Set<Class<?>> messageGroupsToHandle,
             PlacementDriver placementDriver
+    ) {
+        this(
+                nodeName,
+                clusterNetSvc,
+                cmgMgr,
+                clock,
+                messageGroupsToHandle,
+                placementDriver,
+                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
+        );
+    }
+
+    /**
+     * Constructor for a replica service.
+     *
+     * @param nodeName Node name.
+     * @param clusterNetSvc Cluster network service.
+     * @param cmgMgr Cluster group manager.
+     * @param clock A hybrid logical clock.
+     * @param messageGroupsToHandle Message handlers.
+     * @param placementDriver A placement driver.
+     * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe 
time propagation period in ms.
+     */
+    public ReplicaManager(
+            String nodeName,
+            ClusterService clusterNetSvc,
+            ClusterManagementGroupManager cmgMgr,
+            HybridClock clock,
+            Set<Class<?>> messageGroupsToHandle,
+            PlacementDriver placementDriver,
+            LongSupplier idleSafeTimePropagationPeriodMsSupplier
     ) {
         this.clusterNetSvc = clusterNetSvc;
         this.cmgMgr = cmgMgr;
@@ -157,6 +194,7 @@ public class ReplicaManager implements IgniteComponent {
         this.handler = this::onReplicaMessageReceived;
         this.placementDriverMessageHandler = 
this::onPlacementDriverMessageReceived;
         this.placementDriver = placementDriver;
+        this.idleSafeTimePropagationPeriodMsSupplier = 
idleSafeTimePropagationPeriodMsSupplier;
 
         scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool(
                 1,
@@ -506,7 +544,7 @@ public class ReplicaManager implements IgniteComponent {
         scheduledIdleSafeTimeSyncExecutor.scheduleAtFixedRate(
                 this::idleSafeTimeSync,
                 0,
-                IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+                idleSafeTimePropagationPeriodMsSupplier.getAsLong(),
                 TimeUnit.MILLISECONDS
         );
 
@@ -675,4 +713,16 @@ public class ReplicaManager implements IgniteComponent {
     public Set<ReplicationGroupId> startedGroups() {
         return replicas.keySet();
     }
+
+    /**
+     * TODO: to be removed after IGNITE-20499 is fixed. This was introduced in 
a rush because of a burning release, should be fixe asap.
+     */
+    public static long idleSafeTimePropagationPeriodMs() {
+        return Long.parseLong(
+                System.getProperty(
+                        
IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY,
+                        
Integer.toString(DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS)
+                )
+        );
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java
index 4bc248f033..e4118682a3 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java
@@ -69,7 +69,7 @@ class ItJdbcTest extends IgniteIntegrationTest {
         @BeforeAll
         void setUp(TestInfo testInfo, @WorkDirectory Path workDir) {
             cluster = new Cluster(testInfo, workDir);
-            cluster.startAndInit(1, new int[]{ 0 }, builder -> 
builder.clusterConfiguration(
+            cluster.startAndInit(1, builder -> builder.clusterConfiguration(
                     "{\n"
                             + "  \"security\": {\n"
                             + "  \"enabled\": true,\n"
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java
new file mode 100644
index 0000000000..4b1d651127
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.readonly;
+
+import static 
org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests about read-only transactions in the past.
+ */
+@SuppressWarnings("resource")
+// Setting this to 1 second so that an RO tx has a potential to look before a 
table was created.
+@WithSystemProperty(key = 
IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "1000")
+class ItReadOnlyTxInPastTest extends ClusterPerTestIntegrationTest {
+    private static final String TABLE_NAME = "test";
+
+    @Override
+    protected int initialNodes() {
+        return 0;
+    }
+
+    @BeforeEach
+    void prepareCluster() {
+        cluster.startAndInit(1);
+
+        cluster.doInSession(0, session -> {
+            executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY 
KEY, val varchar)", session);
+        });
+    }
+
+    /**
+     * Make sure that an explicit RO transaction does not look too far in the 
past (where the corresponding
+     * table did not yet exist) even when the 'look in the past' optimization 
is enabled.
+     */
+    @Test
+    void explicitReadOnlyTxDoesNotLookBeforeTableCreation() {
+        IgniteImpl node = cluster.node(0);
+
+        long count = node.transactions().runInTransaction(tx -> {
+            return cluster.doInSession(0, session -> {
+                try (ResultSet<SqlRow> resultSet = session.execute(tx, "SELECT 
COUNT(*) FROM " + TABLE_NAME)) {
+                    return resultSet.next().longValue(0);
+                }
+            });
+        }, new TransactionOptions().readOnly(true));
+
+        assertThat(count, is(0L));
+    }
+
+    /**
+     * Make sure that an implicit RO transaction does not look too far in the 
past (where the corresponding
+     * table did not yet exist) even when the 'look in the past' optimization 
is enabled.
+     */
+    @Test
+    void implicitReadOnlyTxDoesNotLookBeforeTableCreation() {
+        long count = cluster.query(0, "SELECT COUNT(*) FROM " + TABLE_NAME, rs 
-> rs.next().longValue(0));
+
+        assertThat(count, is(0L));
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 488a218233..c21f4d7bd2 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -821,13 +821,16 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     new NodeAttributesCollector(nodeAttributes),
                     new TestConfigurationValidator());
 
+            LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = () 
-> 10L;
+
             replicaManager = spy(new ReplicaManager(
                     name,
                     clusterService,
                     cmgManager,
                     hybridClock,
                     Set.of(TableMessageGroup.class, TxMessageGroup.class),
-                    placementDriver
+                    placementDriver,
+                    partitionIdleSafeTimePropagationPeriodMsSupplier
             ));
 
             ReplicaService replicaSvc = new ReplicaService(
@@ -841,7 +844,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     hybridClock,
                     new TransactionIdGenerator(addr.port()),
                     () -> clusterService.topologyService().localMember().id(),
-                    placementDriver
+                    placementDriver,
+                    partitionIdleSafeTimePropagationPeriodMsSupplier
             );
 
             String nodeName = clusterService.nodeName();
@@ -915,7 +919,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
             catalogManager = new CatalogManagerImpl(
                     new UpdateLogImpl(metaStorageManager),
                     clockWaiter,
-                    delayDurationMsSupplier
+                    delayDurationMsSupplier,
+                    partitionIdleSafeTimePropagationPeriodMsSupplier
             );
 
             schemaManager = new CatalogSchemaManager(registry, catalogManager, 
metaStorageManager);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
index f1af298c4b..20b290dc6a 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.rebalance;
 
+import static 
org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -26,11 +27,13 @@ import 
org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.junit.jupiter.api.Test;
 
 /**
  * Tests for recovery of the rebalance procedure.
  */
+@WithSystemProperty(key = 
IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "1000")
 public class ItRebalanceRecoveryTest extends ClusterPerTestIntegrationTest {
     @Override
     protected int initialNodes() {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 946067cb40..4b4bf37a36 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -264,13 +264,16 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 new NodeAttributesCollector(nodeAttributes),
                 new TestConfigurationValidator());
 
+        LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = () -> 
10L;
+
         ReplicaManager replicaMgr = new ReplicaManager(
                 name,
                 clusterSvc,
                 cmgManager,
                 hybridClock,
                 Set.of(TableMessageGroup.class, TxMessageGroup.class),
-                placementDriver
+                placementDriver,
+                partitionIdleSafeTimePropagationPeriodMsSupplier
         );
 
         var replicaService = new ReplicaService(clusterSvc.messagingService(), 
hybridClock);
@@ -285,7 +288,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 hybridClock,
                 new TransactionIdGenerator(idx),
                 () -> clusterSvc.topologyService().localMember().id(),
-                placementDriver
+                placementDriver,
+                partitionIdleSafeTimePropagationPeriodMsSupplier
         );
 
         var logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
@@ -351,7 +355,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
         var catalogManager = new CatalogManagerImpl(
                 new UpdateLogImpl(metaStorageMgr),
                 clockWaiter,
-                delayDurationMsSupplier
+                delayDurationMsSupplier,
+                partitionIdleSafeTimePropagationPeriodMsSupplier
         );
 
         CatalogSchemaManager schemaManager = new 
CatalogSchemaManager(registry, catalogManager, metaStorageMgr);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
index 2fabd673ee..c6e9852eeb 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
@@ -68,6 +68,7 @@ import org.apache.ignite.table.Table;
 import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionOptions;
 import org.hamcrest.Matcher;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -368,7 +369,7 @@ public abstract class ItSqlApiBaseTest extends 
ClusterPerClassIntegrationTest {
         Session ses = sql.createSession();
 
         for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+            executeForRead(ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
         }
 
         List<Boolean> booleanList = List.of(Boolean.TRUE, Boolean.FALSE);
@@ -834,11 +835,11 @@ public abstract class ItSqlApiBaseTest extends 
ClusterPerClassIntegrationTest {
         assertEquals(0, txManager().pending(), "Expected no pending 
transactions");
     }
 
-    protected ResultSet<SqlRow> executeForRead(Session ses, String query) {
-        return executeForRead(ses, null, query);
+    protected ResultSet<SqlRow> executeForRead(Session ses, String query, 
Object... args) {
+        return executeForRead(ses, null, query, args);
     }
 
-    protected abstract ResultSet<SqlRow> executeForRead(Session ses, 
Transaction tx, String query, Object... args);
+    protected abstract ResultSet<SqlRow> executeForRead(Session ses, @Nullable 
Transaction tx, String query, Object... args);
 
     protected <T extends IgniteException> T checkError(Class<T> expCls, 
Integer code, String msg, Session ses, String sql,
             Object... args) {
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 5cf0ace69a..bcfc1cf764 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -467,13 +467,16 @@ public class IgniteImpl implements Ignite {
                 clock
         );
 
+        LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = 
partitionIdleSafeTimePropagationPeriodMsSupplier();
+
         replicaMgr = new ReplicaManager(
                 name,
                 clusterSvc,
                 cmgMgr,
                 clock,
                 Set.of(TableMessageGroup.class, TxMessageGroup.class),
-                placementDriverMgr.placementDriver()
+                placementDriverMgr.placementDriver(),
+                partitionIdleSafeTimePropagationPeriodMsSupplier
         );
 
         
metricManager.configure(clusterConfigRegistry.getConfiguration(MetricConfiguration.KEY));
@@ -511,7 +514,8 @@ public class IgniteImpl implements Ignite {
         CatalogManagerImpl catalogManager = new CatalogManagerImpl(
                 new UpdateLogImpl(metaStorageMgr),
                 clockWaiter,
-                delayDurationMsSupplier
+                delayDurationMsSupplier,
+                partitionIdleSafeTimePropagationPeriodMsSupplier
         );
 
         systemViewManager = new SystemViewManagerImpl(name, catalogManager);
@@ -544,7 +548,8 @@ public class IgniteImpl implements Ignite {
                 clock,
                 new TransactionIdGenerator(() -> 
clusterSvc.nodeName().hashCode()),
                 () -> clusterSvc.topologyService().localMember().id(),
-                placementDriverMgr.placementDriver()
+                placementDriverMgr.placementDriver(),
+                partitionIdleSafeTimePropagationPeriodMsSupplier
         );
 
         distributedTblMgr = new TableManager(
@@ -652,6 +657,11 @@ public class IgniteImpl implements Ignite {
         restComponent = createRestComponent(name);
     }
 
+    private static LongSupplier 
partitionIdleSafeTimePropagationPeriodMsSupplier() {
+        // TODO: Replace with an immutable dynamic property set on cluster 
init after IGNITE-20499 is fixed.
+        return ReplicaManager::idleSafeTimePropagationPeriodMs;
+    }
+
     private AuthenticationManager createAuthenticationManager() {
         SecurityConfiguration securityConfiguration = 
clusterCfgMgr.configurationRegistry()
                 .getConfiguration(SecurityConfiguration.KEY);
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index 632c1388d6..bd98e922c7 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -153,6 +153,16 @@ public class Cluster {
         startAndInit(nodeCount, cmgNodes, builder -> {});
     }
 
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     * @param initParametersConfigurator Configure {@link InitParameters} 
before initializing the cluster.
+     */
+    public void startAndInit(int nodeCount, Consumer<InitParametersBuilder> 
initParametersConfigurator) {
+        startAndInit(nodeCount, new int[]{0}, initParametersConfigurator);
+    }
+
     /**
      * Starts the cluster with the given number of nodes and initializes it.
      *
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/LowWatermarkConfigurationSchema.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/LowWatermarkConfigurationSchema.java
index 51d9e5ed8b..170bbde606 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/LowWatermarkConfigurationSchema.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/LowWatermarkConfigurationSchema.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.schema.configuration;
 
 import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW;
-import static 
org.apache.ignite.internal.replicator.ReplicaManager.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
+import static 
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.configuration.annotation.Config;
@@ -39,7 +39,7 @@ public class LowWatermarkConfigurationSchema {
      * {@code now() - dataAvailabilityTime()}.
      */
     // TODO https://issues.apache.org/jira/browse/IGNITE-18977 Make these 
values configurable and create dynamic validator after that.
-    @Range(min = IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW)
+    @Range(min = DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + 
CLOCK_SKEW)
     @Value(hasDefault = true)
     public long dataAvailabilityTime = TimeUnit.MINUTES.toMillis(45);
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index b3b6fc475f..cf5c315a35 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.runAsync;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
+import static 
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITED;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
@@ -47,6 +48,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -130,6 +132,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
     private final PlacementDriver placementDriver;
 
+    private final LongSupplier idleSafeTimePropagationPeriodMsSupplier;
+
     /**
      * The constructor.
      *
@@ -145,6 +149,35 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             TransactionIdGenerator transactionIdGenerator,
             Supplier<String> localNodeIdSupplier,
             PlacementDriver placementDriver
+    ) {
+        this(
+                replicaService,
+                lockManager,
+                clock,
+                transactionIdGenerator,
+                localNodeIdSupplier,
+                placementDriver,
+                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
+        );
+    }
+
+    /**
+     * The constructor.
+     *
+     * @param replicaService Replica service.
+     * @param lockManager Lock manager.
+     * @param clock A hybrid logical clock.
+     * @param transactionIdGenerator Used to generate transaction IDs.
+     * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe 
time propagation period in ms.
+     */
+    public TxManagerImpl(
+            ReplicaService replicaService,
+            LockManager lockManager,
+            HybridClock clock,
+            TransactionIdGenerator transactionIdGenerator,
+            Supplier<String> localNodeIdSupplier,
+            PlacementDriver placementDriver,
+            LongSupplier idleSafeTimePropagationPeriodMsSupplier
     ) {
         this.replicaService = replicaService;
         this.lockManager = lockManager;
@@ -152,6 +185,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         this.transactionIdGenerator = transactionIdGenerator;
         this.localNodeId = new Lazy<>(localNodeIdSupplier);
         this.placementDriver = placementDriver;
+        this.idleSafeTimePropagationPeriodMsSupplier = 
idleSafeTimePropagationPeriodMsSupplier;
 
         int cpus = Runtime.getRuntime().availableProcessors();
 
@@ -216,14 +250,13 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
      * @return Current read timestamp.
      */
     private HybridTimestamp currentReadTimestamp() {
-        return clock.now();
-
-        // TODO: IGNITE-20378 Fix it
-        // return new HybridTimestamp(now.getPhysical()
-        //         - 
ReplicaManager.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
-        //         - HybridTimestamp.CLOCK_SKEW,
-        //         0
-        // );
+        HybridTimestamp now = clock.now();
+
+        return new HybridTimestamp(now.getPhysical()
+                - idleSafeTimePropagationPeriodMsSupplier.getAsLong()
+                - HybridTimestamp.CLOCK_SKEW,
+                0
+        );
     }
 
     @Override
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 86f29a7aa1..359e9da571 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -19,9 +19,10 @@ package org.apache.ignite.internal.tx;
 
 
 import static java.lang.Math.abs;
+import static 
org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-import static 
org.apache.ignite.internal.replicator.ReplicaManager.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
+import static 
org.apache.ignite.internal.replicator.ReplicaManager.idleSafeTimePropagationPeriodMs;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
@@ -51,6 +52,7 @@ import 
org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.PrimaryReplicaExpiredException;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
@@ -64,7 +66,6 @@ import org.apache.ignite.tx.TransactionException;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -270,11 +271,11 @@ public class TxManagerTest extends IgniteAbstractTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20378";)
+    @WithSystemProperty(key = 
IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "1000")
     public void testObservableTimestamp() {
         long compareThreshold = 50;
         // Check that idle safe time propagation period is significantly 
greater than compareThreshold.
-        assertTrue(IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW 
> compareThreshold * 5);
+        assertTrue(idleSafeTimePropagationPeriodMs() + CLOCK_SKEW > 
compareThreshold * 5);
 
         HybridTimestamp now = clock.now();
 
@@ -291,7 +292,7 @@ public class TxManagerTest extends IgniteAbstractTest {
         tx.commit();
 
         HybridTimestamp timestampInPast = new HybridTimestamp(
-                now.getPhysical() - 
IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS * 2,
+                now.getPhysical() - idleSafeTimePropagationPeriodMs() * 2,
                 now.getLogical()
         );
 
@@ -301,7 +302,7 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         tx = txManager.begin(hybridTimestampTracker, true);
 
-        long readTime = now.getPhysical() - 
IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS - CLOCK_SKEW;
+        long readTime = now.getPhysical() - idleSafeTimePropagationPeriodMs() 
- CLOCK_SKEW;
 
         assertThat(abs(readTime - tx.readTimestamp().getPhysical()), 
Matchers.lessThan(compareThreshold));
 
@@ -309,11 +310,11 @@ public class TxManagerTest extends IgniteAbstractTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20378";)
+    @WithSystemProperty(key = 
IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "1000")
     public void testObservableTimestampLocally() {
         long compareThreshold = 50;
         // Check that idle safe time propagation period is significantly 
greater than compareThreshold.
-        assertTrue(IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW 
> compareThreshold * 5);
+        assertTrue(idleSafeTimePropagationPeriodMs() + CLOCK_SKEW > 
compareThreshold * 5);
 
         HybridTimestamp now = clock.now();
 
@@ -324,7 +325,7 @@ public class TxManagerTest extends IgniteAbstractTest {
         assertTrue(firstReadTs.compareTo(now) < 0);
 
         assertTrue(now.getPhysical() - firstReadTs.getPhysical() < 
compareThreshold
-                + IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW);
+                + idleSafeTimePropagationPeriodMs() + CLOCK_SKEW);
         tx.commit();
 
         tx = txManager.begin(hybridTimestampTracker, true);
@@ -332,7 +333,7 @@ public class TxManagerTest extends IgniteAbstractTest {
         assertTrue(firstReadTs.compareTo(tx.readTimestamp()) <= 0);
 
         assertTrue(abs(now.getPhysical() - tx.readTimestamp().getPhysical()) < 
compareThreshold
-                + IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW);
+                + idleSafeTimePropagationPeriodMs() + CLOCK_SKEW);
         tx.commit();
     }
 


Reply via email to