Cyrill commented on code in PR #2832:
URL: https://github.com/apache/ignite-3/pull/2832#discussion_r1392558279


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryProcessor.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * The processor receives transaction recovery messages. The recovery message 
is received when the node has a commit partition for the
+ * specific transaction.
+ */
+public class TxRecoveryProcessor {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxRecoveryProcessor.class);
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Cluster service. */
+    private final ClusterService clusterService;
+
+    /** The local map for tx states. */
+    private ConcurrentHashMap<UUID, TxStateMeta> txStateMap;
+
+    /**
+     * The constructor.
+     *
+     * @param clusterService Cluster service.
+     */
+    public TxRecoveryProcessor(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    /**
+     * Starts the recovery processor.
+     *
+     * @param txStateMap Transaction state map.
+     */
+    public void start(ConcurrentHashMap<UUID, TxStateMeta> txStateMap) {
+        this.txStateMap = txStateMap;
+
+        
clusterService.messagingService().addMessageHandler(TxMessageGroup.class, (msg, 
sender, correlationId) -> {
+            if (msg instanceof TxRecoveryMessage) {
+                if (busyLock.enterBusy()) {
+                    try {
+                        processTxRecoveryMessage((TxRecoveryMessage) msg);
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                } else {
+                    LOG.info("Transaction recovery message was ignored 
[msg={}]", msg);
+                }
+            }
+        });
+    }
+
+    /**
+     * Processes the recovery transaction message.
+     *
+     * @param msg Transaction recovery message.
+     */
+    private void processTxRecoveryMessage(TxRecoveryMessage msg) {

Review Comment:
   Please help me to recollect the agreed process of handling the tx recovery 
message. 
   I thought the handler of this message was supposed to be the primary replica 
of the commit partition, but this code is executed on every node.
   What am I missing here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to