sashapolo commented on code in PR #4902:
URL: https://github.com/apache/ignite-3/pull/4902#discussion_r1916496880


##########
modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxTimeoutOneNodeTest.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.readonly;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.tx.impl.ReadOnlyTransactionImpl;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.Test;
+
+abstract class ItReadOnlyTxTimeoutOneNodeTest extends 
ClusterPerTestIntegrationTest {
+    private static final String TABLE_NAME = "TEST";
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    abstract Ignite ignite();
+
+    abstract ReadOnlyTransactionImpl transactionImpl(Transaction tx);
+
+    @Test
+    void roTransactionTimesOut() throws Exception {
+        Ignite ignite = ignite();
+
+        ignite.sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT 
PRIMARY KEY, VAL VARCHAR)");
+
+        Table table = ignite.tables().table(TABLE_NAME);
+
+        Transaction roTx = ignite.transactions().begin(new 
TransactionOptions().readOnly(true).timeoutMillis(100));
+
+        // Make sure the RO tx actually begins on the server (as thin client 
transactions are lazy).
+        doGetOn(table, roTx);
+
+        assertTrue(
+                waitForCondition(() -> 
transactionImpl(roTx).isFinishingOrFinished(), SECONDS.toMillis(10)),
+                "Transaction should have been finished due to timeout"
+        );
+
+        // TODO: Uncomment the following lines after 
https://issues.apache.org/jira/browse/IGNITE-23980 is fixed.

Review Comment:
   Looks like this ticket is already fixed



##########
modules/api/src/main/java/org/apache/ignite/tx/TransactionOptions.java:
##########
@@ -39,10 +39,17 @@ public long timeoutMillis() {
     /**
      * Sets transaction timeout, in milliseconds.
      *
-     * @param timeoutMillis Transaction timeout, in milliseconds.
+     * @param timeoutMillis Transaction timeout, in milliseconds. Cannot be 
negative; 0 means 'use default timeout'.
+     *     For RO transactions, the default timeout is data availability time 
configured via ignite.gc.lowWatermark.dataAvailabilityTime
+     *     configuration setting.
+     *     For RW transactions, timeouts are not supported yet. TODO: 
IGNITE-15936

Review Comment:
   Let's extract this todo in a separate comment (outside of the javadoc) for 
it to be more noticeable (I also wonder if the current approach will work with 
the TODO detection tool).



##########
modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class TransactionExpirationRegistryTest extends BaseIgniteAbstractTest {
+    private final TransactionExpirationRegistry registry = new 
TransactionExpirationRegistry();
+
+    @Mock
+    private InternalTransaction tx1;
+
+    @Mock
+    private InternalTransaction tx2;
+
+    @BeforeEach
+    void configureMocks() {
+        lenient().when(tx1.rollbackAsync()).thenReturn(nullCompletedFuture());
+        lenient().when(tx2.rollbackAsync()).thenReturn(nullCompletedFuture());
+    }
+
+    @Test
+    void abortsTransactionsBeforeExpirationTime() {
+        registry.register(tx1, new HybridTimestamp(1000, 0));
+        registry.register(tx2, new HybridTimestamp(2000, 0));
+
+        registry.expireUpTo(new HybridTimestamp(3000, 0));
+
+        verify(tx1).rollbackAsync();
+        verify(tx2).rollbackAsync();
+    }
+
+    @Test
+    void abortsTransactionsExactlyOnExpirationTime() {
+        registry.register(tx1, new HybridTimestamp(1000, 0));
+
+        registry.expireUpTo(new HybridTimestamp(1000, 0));
+
+        verify(tx1).rollbackAsync();
+    }
+
+    @Test
+    void doesNotAbortTransactionsAfterExpirationTime() {
+        registry.register(tx1, new HybridTimestamp(1000, 1));
+
+        registry.expireUpTo(new HybridTimestamp(1000, 0));
+
+        verify(tx1, never()).rollbackAsync();
+    }
+
+    @Test
+    void abortsTransactionsExpiredAfterFewExpirations() {
+        registry.register(tx1, new HybridTimestamp(1000, 1));
+
+        registry.expireUpTo(new HybridTimestamp(1000, 0));
+        registry.expireUpTo(new HybridTimestamp(2000, 0));
+
+        verify(tx1).rollbackAsync();
+    }
+
+    @Test
+    void abortsAlreadyExpiredTransactionOnRegistration() {
+        registry.expireUpTo(new HybridTimestamp(2000, 0));
+
+        registry.register(tx1, new HybridTimestamp(1000, 0));
+        registry.register(tx2, new HybridTimestamp(2000, 0));
+
+        verify(tx1).rollbackAsync();
+        verify(tx2).rollbackAsync();
+    }
+
+    @Test
+    void abortsAlreadyExpiredTransactionJustOnce() {
+        registry.expireUpTo(new HybridTimestamp(2000, 0));
+
+        registry.register(tx1, new HybridTimestamp(1000, 0));
+        registry.register(tx2, new HybridTimestamp(2000, 0));
+
+        registry.expireUpTo(new HybridTimestamp(2000, 0));
+
+        verify(tx1, times(1)).rollbackAsync();
+        verify(tx2, times(1)).rollbackAsync();
+    }
+
+    @Test
+    void abortsAllRegistered() {
+        registry.register(tx1, new HybridTimestamp(1000, 0));
+        registry.register(tx2, HybridTimestamp.MAX_VALUE);
+
+        registry.abortAllRegistered();
+
+        verify(tx1).rollbackAsync();
+        verify(tx2).rollbackAsync();
+    }
+
+    @Test
+    void abortsOnRegistrationAfterAbortingAllRegistered() {
+        registry.abortAllRegistered();
+
+        registry.register(tx1, new HybridTimestamp(1000, 0));
+        registry.register(tx2, HybridTimestamp.MAX_VALUE);
+
+        verify(tx1).rollbackAsync();
+        verify(tx2).rollbackAsync();
+    }
+
+    @Test
+    void removesTransactionOnUnregister() {
+        registry.register(tx1, new HybridTimestamp(1000, 0));
+
+        registry.unregister(tx1);
+
+        registry.expireUpTo(new HybridTimestamp(2000, 0));
+
+        // Should not be aborted due to expiration as we removed the 
transaction.
+        verify(tx1, never()).rollbackAsync();
+    }
+
+    @Test
+    void unregisterIsIdempotent() {
+        registry.register(tx1, new HybridTimestamp(1000, 0));
+
+        registry.unregister(tx1);
+        registry.unregister(tx1);

Review Comment:
   Let's add `assertDoesNotThrow` to make the intention clear



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.tx.InternalTransaction;
+
+class TransactionExpirationRegistry {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TransactionExpirationRegistry.class);
+
+    private final NavigableMap<HybridTimestamp, Set<InternalTransaction>> 
txsByExpirationTime = new ConcurrentSkipListMap<>();
+    private final Map<InternalTransaction, HybridTimestamp> expirationTimeByTx 
= new ConcurrentHashMap<>();
+
+    private final ReadWriteLock watermarkLock = new ReentrantReadWriteLock();
+    private volatile HybridTimestamp watermark = HybridTimestamp.MIN_VALUE;
+
+    void register(InternalTransaction tx, HybridTimestamp txExpirationTime) {
+        if (isExpired(txExpirationTime)) {
+            abortTransaction(tx);
+            return;
+        }
+
+        watermarkLock.readLock().lock();
+
+        try {
+            if (isExpired(txExpirationTime)) {
+                abortTransaction(tx);
+                return;
+            }
+
+            Set<InternalTransaction> txsExpiringAtTs = 
txsByExpirationTime.computeIfAbsent(
+                    txExpirationTime,
+                    k -> ConcurrentHashMap.newKeySet()
+            );
+            txsExpiringAtTs.add(tx);
+
+            expirationTimeByTx.put(tx, txExpirationTime);
+        } finally {
+            watermarkLock.readLock().unlock();
+        }
+    }
+
+    private boolean isExpired(HybridTimestamp expirationTime) {
+        return expirationTime.compareTo(watermark) <= 0;
+    }
+
+    private static void abortTransaction(InternalTransaction tx) {
+        tx.rollbackAsync().whenComplete((res, ex) -> {
+            if (ex != null) {
+                LOG.error("Transaction abort due to timeout failed [txId={}]", 
ex, tx.id());
+            }
+        });
+    }
+
+    void expireUpTo(HybridTimestamp expirationTime) {
+        List<Set<InternalTransaction>> transactionSetsToExpire;
+
+        watermarkLock.writeLock().lock();
+
+        try {
+            NavigableMap<HybridTimestamp, Set<InternalTransaction>> headMap = 
txsByExpirationTime.headMap(expirationTime, true);
+            transactionSetsToExpire = new ArrayList<>(headMap.values());
+            headMap.clear();
+
+            watermark = expirationTime;
+        } finally {
+            watermarkLock.writeLock().unlock();
+        }
+
+        for (Set<InternalTransaction> set : transactionSetsToExpire) {
+            for (InternalTransaction tx : set) {
+                expirationTimeByTx.remove(tx);
+
+                abortTransaction(tx);
+            }
+        }
+    }
+
+    void abortAllRegistered() {
+        expireUpTo(HybridTimestamp.MAX_VALUE);
+    }
+
+    void unregister(InternalTransaction tx) {
+        HybridTimestamp expirationTime = expirationTimeByTx.remove(tx);
+
+        if (expirationTime != null) {
+            txsByExpirationTime.compute(expirationTime, (k, set) -> {
+                if (set == null) {
+                    return null;
+                }
+
+                set.remove(tx);

Review Comment:
   Do I understand correctly that most of the time this set will only contain 
one element? Should we optimize for it?



##########
modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java:
##########
@@ -165,7 +165,10 @@ private synchronized CompletableFuture<ClientTransaction> 
ensureStarted(
         return tx0;
     }
 
-    ClientTransaction startedTx() {
+    /**
+     * Returns actual {@link ClientTransaction} started by this transaction or 
throws an exception if no transaction was started yet.

Review Comment:
   What exception does it throw?



##########
modules/api/src/main/java/org/apache/ignite/tx/TransactionOptions.java:
##########
@@ -39,10 +39,17 @@ public long timeoutMillis() {
     /**
      * Sets transaction timeout, in milliseconds.
      *
-     * @param timeoutMillis Transaction timeout, in milliseconds.
+     * @param timeoutMillis Transaction timeout, in milliseconds. Cannot be 
negative; 0 means 'use default timeout'.
+     *     For RO transactions, the default timeout is data availability time 
configured via ignite.gc.lowWatermark.dataAvailabilityTime

Review Comment:
   Could you please check that this javadoc renders in a nice way? I'm pretty 
sure we need some `<p>` tags



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistry.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.tx.InternalTransaction;
+
+class TransactionExpirationRegistry {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TransactionExpirationRegistry.class);
+
+    private final NavigableMap<HybridTimestamp, Set<InternalTransaction>> 
txsByExpirationTime = new ConcurrentSkipListMap<>();
+    private final Map<InternalTransaction, HybridTimestamp> expirationTimeByTx 
= new ConcurrentHashMap<>();
+
+    private final ReadWriteLock watermarkLock = new ReentrantReadWriteLock();
+    private volatile HybridTimestamp watermark = HybridTimestamp.MIN_VALUE;
+
+    void register(InternalTransaction tx, HybridTimestamp txExpirationTime) {
+        if (isExpired(txExpirationTime)) {
+            abortTransaction(tx);
+            return;
+        }
+
+        watermarkLock.readLock().lock();
+
+        try {
+            if (isExpired(txExpirationTime)) {
+                abortTransaction(tx);
+                return;
+            }
+
+            Set<InternalTransaction> txsExpiringAtTs = 
txsByExpirationTime.computeIfAbsent(
+                    txExpirationTime,
+                    k -> ConcurrentHashMap.newKeySet()
+            );
+            txsExpiringAtTs.add(tx);
+
+            expirationTimeByTx.put(tx, txExpirationTime);
+        } finally {
+            watermarkLock.readLock().unlock();
+        }
+    }
+
+    private boolean isExpired(HybridTimestamp expirationTime) {
+        return expirationTime.compareTo(watermark) <= 0;
+    }
+
+    private static void abortTransaction(InternalTransaction tx) {
+        tx.rollbackAsync().whenComplete((res, ex) -> {
+            if (ex != null) {
+                LOG.error("Transaction abort due to timeout failed [txId={}]", 
ex, tx.id());
+            }
+        });
+    }
+
+    void expireUpTo(HybridTimestamp expirationTime) {
+        List<Set<InternalTransaction>> transactionSetsToExpire;
+
+        watermarkLock.writeLock().lock();
+
+        try {
+            NavigableMap<HybridTimestamp, Set<InternalTransaction>> headMap = 
txsByExpirationTime.headMap(expirationTime, true);
+            transactionSetsToExpire = new ArrayList<>(headMap.values());
+            headMap.clear();
+
+            watermark = expirationTime;
+        } finally {
+            watermarkLock.writeLock().unlock();
+        }
+
+        for (Set<InternalTransaction> set : transactionSetsToExpire) {
+            for (InternalTransaction tx : set) {
+                expirationTimeByTx.remove(tx);
+
+                abortTransaction(tx);
+            }
+        }
+    }
+
+    void abortAllRegistered() {
+        expireUpTo(HybridTimestamp.MAX_VALUE);
+    }
+
+    void unregister(InternalTransaction tx) {
+        HybridTimestamp expirationTime = expirationTimeByTx.remove(tx);
+
+        if (expirationTime != null) {
+            txsByExpirationTime.compute(expirationTime, (k, set) -> {
+                if (set == null) {

Review Comment:
   I guess you can use `computeIfPresent`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to