denis-chudov commented on code in PR #2832:
URL: https://github.com/apache/ignite-3/pull/2832#discussion_r1392421835


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -179,13 +179,16 @@ private static class LockState {
         /** Marked for removal flag. */
         private boolean markedForRemove = false;
 
-        public LockState(DeadlockPreventionPolicy deadlockPreventionPolicy, 
Executor delayedExecutor) {
+        private final LockKey lockKey;

Review Comment:
   seems that this field in not used



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionConflictTest.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Abundant transactions integration tests.

Review Comment:
   Did you mean "abandoned"?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -349,9 +332,17 @@ public CompletableFuture<Void> finish(
         // than all the read timestamps processed before.
         // Every concurrent operation will now use a finish future from the 
finishing state meta and get only final transaction
         // state after the transaction is finished.
-        TxStateMetaFinishing finishingStateMeta = new 
TxStateMetaFinishing(coordinatorId());
+        AtomicReference<TxStateMetaFinishing> finishingStateMetaRef = new 
AtomicReference<>();
+
+        updateTxMeta(txId, old -> {
+            var finishingState = new TxStateMetaFinishing(localNodeId, old == 
null ? null : old.commitPartitionId());
 
-        updateTxMeta(txId, old -> finishingStateMeta);
+            finishingStateMetaRef.set(finishingState);
+
+            return finishingState;
+        });

Review Comment:
   I think it would be better to make `updateTxMeta` return the state that was 
actually set.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java:
##########
@@ -30,22 +33,28 @@ public class TxStateMeta implements TransactionMeta {
 
     private final String txCoordinatorId;
 
+    /** Identifier of the replication group that manages a transaction state. 
*/
+    private ReplicationGroupId commitPartitionId;

Review Comment:
   `TablePartitionId` would be more useful here.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java:
##########
@@ -116,7 +116,7 @@ public CompletableFuture<Void> finish(boolean commit, 
HybridTimestamp executionT
         return ((TxManagerImpl) 
txManager).completeReadOnlyTransactionFuture(new 
TxIdAndTimestamp(readTimestamp, id()))
                 .thenRun(() -> txManager.updateTxMeta(
                         id(),
-                        old -> new TxStateMeta(COMMITED, 
old.txCoordinatorId(), old.commitTimestamp())
+                        old -> new TxStateMeta(COMMITED, 
old.txCoordinatorId(), null, old.commitTimestamp())

Review Comment:
   The commit partition should be taken from the old value here, otherwise it 
can be lost



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionConflictTest.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Abundant transactions integration tests.
+ */
+public class ItTransactionConflictTest extends ClusterPerTestIntegrationTest {
+    /** Table name. */
+    private static final String TABLE_NAME = "test_table";
+
+    @BeforeEach
+    @Override
+    public void setup(TestInfo testInfo) throws Exception {
+        super.setup(testInfo);
+
+        String zoneSql = "create zone test_zone with partitions=1, replicas=3";
+        String sql = "create table " + TABLE_NAME + " (key int primary key, 
val varchar(20)) with primary_zone='TEST_ZONE'";
+
+        cluster.doInSession(0, session -> {
+            executeUpdate(zoneSql, session);
+            executeUpdate(sql, session);
+        });
+    }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20773";)
+    public void test() throws Exception {
+        TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+
+        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
+                tblReplicationGrp,
+                node(0).clock().now(),
+                10,
+                SECONDS
+        );
+
+        assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+        String leaseholder = primaryReplicaFut.join().getLeaseholder();
+
+        IgniteImpl commitPartNode = IntStream.range(0, 
initialNodes()).mapToObj(this::node).filter(n -> leaseholder.equals(n.name()))
+                .findFirst().get();
+
+        log.info("Transaction commit partition is determined [node={}].", 
commitPartNode.name());
+
+        IgniteImpl txCrdNode = IntStream.range(1, 
initialNodes()).mapToObj(this::node).filter(n -> !leaseholder.equals(n.name()))
+                .findFirst().get();
+
+        log.info("Transaction coordinator is chosen [node={}].", 
txCrdNode.name());
+
+        UUID orphanTxId = startTransactionAndStopNode(txCrdNode);
+
+        CompletableFuture<UUID> recoveryTxMsgCaptureFut = new 
CompletableFuture<>();
+
+

Review Comment:
   empty line



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * The class detects transactions that are left without a coordinator but 
still hold locks. For that orphan transaction, the recovery
+ * message is sent to the commit partition replication group.
+ */
+public class OrphanDetector {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(OrphanDetector.class);
+
+    /** Tx messages factory. */
+    private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+
+    private static final long AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC = 10;
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Cluster service. */
+    private final ClusterService clusterService;
+
+    private final PlacementDriver placementDriver;
+
+    private final HybridClock clock;
+
+    /** The local map for tx states. */
+    private ConcurrentHashMap<UUID, TxStateMeta> txStateMap;
+
+    /**
+     * The constructor.
+     *
+     * @param clusterService Cluster service.
+     * @param placementDriver Placement driver.
+     * @param clock Clock.
+     */
+    public OrphanDetector(ClusterService clusterService, PlacementDriver 
placementDriver, HybridClock clock) {
+        this.clusterService = clusterService;
+        this.placementDriver = placementDriver;
+        this.clock = clock;
+    }
+
+
+    /**
+     * Starts the detector.
+     *
+     * @param txStateMap Transaction state map.
+     */
+    public void start(ConcurrentHashMap<UUID, TxStateMeta> txStateMap) {
+        this.txStateMap = txStateMap;
+        // Subscribe to lock conflicts here.
+    }
+
+    /**
+     * Stops the detector.
+     */
+    public void stop() {
+        busyLock.block();
+        // Unsubscribe from lock conflicts here.
+    }
+
+    /**
+     * Sends {@link TxRecoveryMessage} if the transaction is orphaned.
+     *
+     * @param txId Transaction id that holds a lock.
+     * @return Future to complete.
+     */
+    private CompletableFuture<Void> handleLockHolder(UUID txId) {

Review Comment:
   Again, private method with no usages. Is it supposed to be called on lock 
conflict?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * The class detects transactions that are left without a coordinator but 
still hold locks. For that orphan transaction, the recovery
+ * message is sent to the commit partition replication group.
+ */
+public class OrphanDetector {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(OrphanDetector.class);
+
+    /** Tx messages factory. */
+    private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+
+    private static final long AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC = 10;
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Cluster service. */
+    private final ClusterService clusterService;
+
+    private final PlacementDriver placementDriver;
+
+    private final HybridClock clock;
+
+    /** The local map for tx states. */
+    private ConcurrentHashMap<UUID, TxStateMeta> txStateMap;
+
+    /**
+     * The constructor.
+     *
+     * @param clusterService Cluster service.
+     * @param placementDriver Placement driver.
+     * @param clock Clock.
+     */
+    public OrphanDetector(ClusterService clusterService, PlacementDriver 
placementDriver, HybridClock clock) {
+        this.clusterService = clusterService;
+        this.placementDriver = placementDriver;
+        this.clock = clock;
+    }
+
+

Review Comment:
   empty line



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * The class detects transactions that are left without a coordinator but 
still hold locks. For that orphan transaction, the recovery
+ * message is sent to the commit partition replication group.
+ */
+public class OrphanDetector {

Review Comment:
   Not sure that this reflects the essence of what this class is doing. 
Actually it initiates the recovery process, it doesn't detect anything. But 
still, it's not clear at all, because there are no usages of this class. Is it 
supposed to be triggered by the lock manager on lock conflict? Seems that this 
part was forgotten.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryProcessor.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * The processor receives transaction recovery messages. The recovery message 
is received when the node has a commit partition for the
+ * specific transaction.
+ */
+public class TxRecoveryProcessor {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxRecoveryProcessor.class);
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Cluster service. */
+    private final ClusterService clusterService;
+
+    /** The local map for tx states. */
+    private ConcurrentHashMap<UUID, TxStateMeta> txStateMap;
+
+    /**
+     * The constructor.
+     *
+     * @param clusterService Cluster service.
+     */
+    public TxRecoveryProcessor(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    /**
+     * Starts the recovery processor.
+     *
+     * @param txStateMap Transaction state map.
+     */
+    public void start(ConcurrentHashMap<UUID, TxStateMeta> txStateMap) {
+        this.txStateMap = txStateMap;
+
+        
clusterService.messagingService().addMessageHandler(TxMessageGroup.class, (msg, 
sender, correlationId) -> {
+            if (msg instanceof TxRecoveryMessage) {
+                if (busyLock.enterBusy()) {
+                    try {
+                        processTxRecoveryMessage((TxRecoveryMessage) msg);
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                } else {
+                    LOG.info("Transaction recovery message was ignored 
[msg={}]", msg);
+                }
+            }
+        });
+    }
+
+    /**
+     * Processes the recovery transaction message.
+     *
+     * @param msg Transaction recovery message.
+     */
+    private void processTxRecoveryMessage(TxRecoveryMessage msg) {
+        TxStateMeta txState = txStateMap.get(msg.txId());
+
+        if (txState == null || !TxState.isFinalState(txState.txState())) {
+            if (txState != null && 
clusterService.topologyService().getById(txState.txCoordinatorId()) == null) {

Review Comment:
   Why the liveness of the coordinator should relate the txState?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryProcessor.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * The processor receives transaction recovery messages. The recovery message 
is received when the node has a commit partition for the
+ * specific transaction.
+ */
+public class TxRecoveryProcessor {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxRecoveryProcessor.class);
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Cluster service. */
+    private final ClusterService clusterService;
+
+    /** The local map for tx states. */
+    private ConcurrentHashMap<UUID, TxStateMeta> txStateMap;
+
+    /**
+     * The constructor.
+     *
+     * @param clusterService Cluster service.
+     */
+    public TxRecoveryProcessor(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    /**
+     * Starts the recovery processor.
+     *
+     * @param txStateMap Transaction state map.
+     */
+    public void start(ConcurrentHashMap<UUID, TxStateMeta> txStateMap) {
+        this.txStateMap = txStateMap;
+
+        
clusterService.messagingService().addMessageHandler(TxMessageGroup.class, (msg, 
sender, correlationId) -> {
+            if (msg instanceof TxRecoveryMessage) {
+                if (busyLock.enterBusy()) {
+                    try {
+                        processTxRecoveryMessage((TxRecoveryMessage) msg);
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                } else {
+                    LOG.info("Transaction recovery message was ignored 
[msg={}]", msg);
+                }
+            }
+        });
+    }
+
+    /**
+     * Processes the recovery transaction message.
+     *
+     * @param msg Transaction recovery message.
+     */
+    private void processTxRecoveryMessage(TxRecoveryMessage msg) {
+        TxStateMeta txState = txStateMap.get(msg.txId());
+
+        if (txState == null || !TxState.isFinalState(txState.txState())) {
+            if (txState != null && 
clusterService.topologyService().getById(txState.txCoordinatorId()) == null) {
+                LOG.warn(
+                        "The transaction coordinator is alive, which is why 
the recovery message is ignored [tx={}, crd={}].",

Review Comment:
   Seems that this is inconsistent with the condition: 
`clusterService.topologyService().getById(txState.txCoordinatorId()) == null`.
   Also, I would rephrase the message: `The transaction coordinator is alive, 
so the recovery message is ignored`



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -590,7 +585,16 @@ public int pending() {
 
     @Override
     public void start() {
-        
replicaService.messagingService().addMessageHandler(ReplicaMessageGroup.class, 
this);
+        localNodeId = clusterService.topologyService().localMember().id();
+        
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class, 
this);

Review Comment:
   What is this? Does it fix some bug? I see that `onReceived` was added before 
but never used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to