ascherbakoff commented on code in PR #7779:
URL: https://github.com/apache/ignite-3/pull/7779#discussion_r2945890867


##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTxPartitionEnlistmentCleaner.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.tx;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
+
+/**
+ * Helper class to clean up direct transaction enlistments on the server side 
for a client connection.
+ */
+public class ClientTxPartitionEnlistmentCleaner {
+    private final UUID txId;
+
+    private final TxManager txManager;
+
+    private final IgniteTablesInternal igniteTables;
+
+    private final Map<ZonePartitionId, PendingTxPartitionEnlistment> 
enlistedPartitions = new ConcurrentHashMap<>();
+
+    /**
+     * Creates a new instance of the transaction partition enlistment cleaner.
+     *
+     * @param txId Transaction ID.
+     * @param txManager Transaction manager.
+     * @param igniteTables Ignite tables.
+     */
+    public ClientTxPartitionEnlistmentCleaner(UUID txId, TxManager txManager, 
IgniteTablesInternal igniteTables) {
+        this.txId = txId;
+        this.txManager = txManager;
+        this.igniteTables = igniteTables;
+    }
+
+    /**
+     * Adds a partition enlistment for the given table and partition.
+     *
+     * @param tableId Table ID.
+     * @param partId Partition ID.
+     */
+    public void addEnlistment(int tableId, int partId) {
+        TableViewInternal table = igniteTables.cachedTable(tableId);
+
+        if (table != null) {
+            ZonePartitionId replicationGroupId = 
table.internalTable().targetReplicationGroupId(partId);
+            enlistedPartitions.computeIfAbsent(replicationGroupId, k -> new 
PendingTxPartitionEnlistment(null, 0))
+                    .addTableId(tableId);
+        }
+    }
+
+    /**
+     * Discards local write intents for all enlisted partitions.
+     *
+     * @return Future that completes when cleanup is done.
+     */
+    public CompletableFuture<Void> clean() {
+        List<EnlistedPartitionGroup> enlistedPartitionGroups = 
enlistedPartitions.entrySet().stream()

Review Comment:
   I believe it is not safe to simply drop the transaction part on client's 
disconnect.
   This means locks can be dropped while the transaction is still active, 
causing data corruption.
   We need first to mark the transaction as aborted on a commit partition.
   @denis-chudov Can you provide the required method to achieve this ?
   



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionCleanupTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.runner.app.client.ItThinClientTransactionsTest.generateKeysForNode;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.client.table.ClientTable;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.Partition;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for client transaction cleanup on disconnect.
+ */
+@SuppressWarnings({"resource", "DataFlowIssue"})
+public class ItThinClientTransactionCleanupTest extends 
ItAbstractThinClientTest {
+    /**
+     * Tests that locks are released when client disconnects with a 
transaction having direct enlistments.
+     */
+    @Test
+    void testClientDisconnectReleasesTxLocksFast() {
+        try (IgniteClient client = 
IgniteClient.builder().addresses(getClientAddresses().toArray(new 
String[0])).build()) {
+            var table = (ClientTable) client.tables().table(TABLE_NAME);
+            Map<Partition, ClusterNode> map = 
table.partitionDistribution().primaryReplicas();
+
+            IgniteImpl server0 = unwrapIgniteImpl(server(0));
+            IgniteImpl server1 = unwrapIgniteImpl(server(1));
+
+            List<Tuple> tuples0 = generateKeysForNode(300, 1, map, 
server0.cluster().localNode(), table);
+            List<Tuple> tuples1 = generateKeysForNode(310, 1, map, 
server1.cluster().localNode(), table);
+
+            // Perform a mix of operations to trigger direct tx logic.
+            Map<Tuple, Tuple> data = new HashMap<>();
+
+            data.put(tuples0.get(0), val(tuples0.get(0).intValue(0) + ""));
+            data.put(tuples1.get(0), val(tuples1.get(0).intValue(0) + ""));
+
+            ClientLazyTransaction tx0 = (ClientLazyTransaction) 
client.transactions().begin();
+
+            table.keyValueView().putAll(tx0, data);
+
+            for (Entry<Tuple, Tuple> entry : data.entrySet()) {
+                table.keyValueView().put(tx0, entry.getKey(), 
entry.getValue());
+            }
+
+            assertThat(txLockCount(), greaterThanOrEqualTo(2));
+
+            // Disconnect without commit or rollback.
+        }
+
+        await().atMost(Duration.ofSeconds(2))
+                .untilAsserted(() -> assertEquals(0, txLockCount()));
+    }
+
+    private int txLockCount() {
+        int count = 0;
+
+        for (int i = 0; i < nodes(); i++) {
+            IgniteImpl ignite = unwrapIgniteImpl(server(i));
+            LockManager lockManager = ignite.txManager().lockManager();
+
+            var iter = lockManager.locks();

Review Comment:
   You can use `CollectionUtils.count(locks)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to