This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ea454d40b4 IGNITE-22045 Fix ItComputeApiThreadingTest (#3607)
ea454d40b4 is described below

commit ea454d40b4ea150a357228aae5edc53308a1eb52
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Mon Apr 15 19:11:10 2024 +0400

    IGNITE-22045 Fix ItComputeApiThreadingTest (#3607)
---
 .../threading/ItComputeApiThreadingTest.java       |  2 +-
 .../internal/network/NettyWorkersRegistrar.java    | 31 ++++++++++++++++++----
 .../ignite/internal/replicator/ReplicaManager.java |  4 +--
 .../runner/app/ItIgniteNodeRestartTest.java        |  3 ++-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  3 ++-
 5 files changed, 33 insertions(+), 10 deletions(-)

diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
index 287db4381d..1d8ac1d95c 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
@@ -99,7 +99,7 @@ class ItComputeApiThreadingTest extends 
ClusterPerClassIntegrationTest {
         CompletableFuture<Thread> completerFuture = 
operation.executeOn(compute)
                 .thenApply(unused -> currentThread());
 
-        assertThat(completerFuture, willBe(anIgniteThread()));
+        assertThat(completerFuture, 
willBe(either(is(currentThread())).or(anIgniteThread())));
     }
 
     private static IgniteCompute computeForInternalUse() {
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java
index d164cbd74a..d9246f24cc 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java
@@ -29,6 +29,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.failure.FailureType;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.worker.CriticalWorker;
 import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
@@ -39,6 +44,8 @@ import org.jetbrains.annotations.Nullable;
  * This component is responsible for registering Netty workers with the {@link 
CriticalWorkerRegistry} and for updating their heartbeats.
  */
 public class NettyWorkersRegistrar implements IgniteComponent {
+    private static final IgniteLogger LOG = 
Loggers.forClass(NettyWorkersRegistrar.class);
+
     /*
      * It seems impossible to instrument tasks executed by Netty event loops, 
so the strategy we use is to
      * send 'heartbeat' tasks to each of the event loops periodically; these 
tasks just update the heartbeat timestamp
@@ -54,6 +61,8 @@ public class NettyWorkersRegistrar implements IgniteComponent 
{
 
     private final CriticalWorkersConfiguration criticalWorkersConfiguration;
 
+    private final FailureProcessor failureProcessor;
+
     private volatile List<NettyWorker> workers;
 
     @Nullable
@@ -66,17 +75,20 @@ public class NettyWorkersRegistrar implements 
IgniteComponent {
      *         blocked.
      * @param scheduler Used to schedule periodic tasks.
      * @param bootstrapFactory Used to obtain Netty workers.
+     * @param failureProcessor Used to process failures.
      */
     public NettyWorkersRegistrar(
             CriticalWorkerRegistry criticalWorkerRegistry,
             ScheduledExecutorService scheduler,
             NettyBootstrapFactory bootstrapFactory,
-            CriticalWorkersConfiguration criticalWorkersConfiguration
+            CriticalWorkersConfiguration criticalWorkersConfiguration,
+            FailureProcessor failureProcessor
     ) {
         this.criticalWorkerRegistry = criticalWorkerRegistry;
         this.scheduler = scheduler;
         this.bootstrapFactory = bootstrapFactory;
         this.criticalWorkersConfiguration = criticalWorkersConfiguration;
+        this.failureProcessor = failureProcessor;
     }
 
     @Override
@@ -109,12 +121,21 @@ public class NettyWorkersRegistrar implements 
IgniteComponent {
     }
 
     private void sendHearbeats() {
-        try {
-            for (NettyWorker worker : workers) {
+        for (NettyWorker worker : workers) {
+            try {
                 worker.sendHeartbeat();
+            } catch (Exception | AssertionError e) {
+                LOG.warn("Cannot send a heartbeat to a Netty thread 
[threadId={}].", e, worker.threadId());
+            } catch (Error e) {
+                LOG.error(
+                        "Cannot send a heartbeat to a Netty thread, no more 
heartbeats will be sent [threadId={}].",
+                        e, worker.threadId()
+                );
+
+                failureProcessor.process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
+
+                throw e;
             }
-        } catch (Exception | AssertionError ignored) {
-            // Ignore.
         }
     }
 
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index ff1280f818..883fa59647 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -777,9 +777,9 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             try {
                 sendSafeTimeSyncIfReplicaReady(entry.getValue());
             } catch (Exception | AssertionError e) {
-                LOG.warn("Error while trying to send a safe time sync request 
to {}", e, entry.getKey());
+                LOG.warn("Error while trying to send a safe time sync request 
[groupId={}]", e, entry.getKey());
             } catch (Error e) {
-                LOG.error("Error while trying to send a safe time sync request 
to {}", e, entry.getKey());
+                LOG.error("Error while trying to send a safe time sync request 
[groupId={}]", e, entry.getKey());
 
                 failureProcessor.process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
             }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index e355a6e20d..877006b201 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -335,7 +335,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 workerRegistry,
                 threadPoolsManager.commonScheduler(),
                 nettyBootstrapFactory,
-                workersConfiguration
+                workersConfiguration,
+                failureProcessor
         );
 
         var clusterSvc = new 
TestScaleCubeClusterServiceFactory().createClusterService(
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 3714a4de87..9cc6b46176 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -465,7 +465,8 @@ public class IgniteImpl implements Ignite {
                 criticalWorkerRegistry,
                 threadPoolsManager.commonScheduler(),
                 nettyBootstrapFactory,
-                criticalWorkersConfiguration
+                criticalWorkersConfiguration,
+                failureProcessor
         );
 
         clusterSvc = new ScaleCubeClusterServiceFactory().createClusterService(

Reply via email to