This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new ea454d40b4 IGNITE-22045 Fix ItComputeApiThreadingTest (#3607) ea454d40b4 is described below commit ea454d40b4ea150a357228aae5edc53308a1eb52 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Mon Apr 15 19:11:10 2024 +0400 IGNITE-22045 Fix ItComputeApiThreadingTest (#3607) --- .../threading/ItComputeApiThreadingTest.java | 2 +- .../internal/network/NettyWorkersRegistrar.java | 31 ++++++++++++++++++---- .../ignite/internal/replicator/ReplicaManager.java | 4 +-- .../runner/app/ItIgniteNodeRestartTest.java | 3 ++- .../org/apache/ignite/internal/app/IgniteImpl.java | 3 ++- 5 files changed, 33 insertions(+), 10 deletions(-) diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java index 287db4381d..1d8ac1d95c 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java @@ -99,7 +99,7 @@ class ItComputeApiThreadingTest extends ClusterPerClassIntegrationTest { CompletableFuture<Thread> completerFuture = operation.executeOn(compute) .thenApply(unused -> currentThread()); - assertThat(completerFuture, willBe(anIgniteThread())); + assertThat(completerFuture, willBe(either(is(currentThread())).or(anIgniteThread()))); } private static IgniteCompute computeForInternalUse() { diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java b/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java index d164cbd74a..d9246f24cc 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java @@ -29,6 +29,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import org.apache.ignite.internal.failure.FailureContext; +import org.apache.ignite.internal.failure.FailureProcessor; +import org.apache.ignite.internal.failure.FailureType; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.worker.CriticalWorker; import org.apache.ignite.internal.worker.CriticalWorkerRegistry; @@ -39,6 +44,8 @@ import org.jetbrains.annotations.Nullable; * This component is responsible for registering Netty workers with the {@link CriticalWorkerRegistry} and for updating their heartbeats. */ public class NettyWorkersRegistrar implements IgniteComponent { + private static final IgniteLogger LOG = Loggers.forClass(NettyWorkersRegistrar.class); + /* * It seems impossible to instrument tasks executed by Netty event loops, so the strategy we use is to * send 'heartbeat' tasks to each of the event loops periodically; these tasks just update the heartbeat timestamp @@ -54,6 +61,8 @@ public class NettyWorkersRegistrar implements IgniteComponent { private final CriticalWorkersConfiguration criticalWorkersConfiguration; + private final FailureProcessor failureProcessor; + private volatile List<NettyWorker> workers; @Nullable @@ -66,17 +75,20 @@ public class NettyWorkersRegistrar implements IgniteComponent { * blocked. * @param scheduler Used to schedule periodic tasks. * @param bootstrapFactory Used to obtain Netty workers. + * @param failureProcessor Used to process failures. */ public NettyWorkersRegistrar( CriticalWorkerRegistry criticalWorkerRegistry, ScheduledExecutorService scheduler, NettyBootstrapFactory bootstrapFactory, - CriticalWorkersConfiguration criticalWorkersConfiguration + CriticalWorkersConfiguration criticalWorkersConfiguration, + FailureProcessor failureProcessor ) { this.criticalWorkerRegistry = criticalWorkerRegistry; this.scheduler = scheduler; this.bootstrapFactory = bootstrapFactory; this.criticalWorkersConfiguration = criticalWorkersConfiguration; + this.failureProcessor = failureProcessor; } @Override @@ -109,12 +121,21 @@ public class NettyWorkersRegistrar implements IgniteComponent { } private void sendHearbeats() { - try { - for (NettyWorker worker : workers) { + for (NettyWorker worker : workers) { + try { worker.sendHeartbeat(); + } catch (Exception | AssertionError e) { + LOG.warn("Cannot send a heartbeat to a Netty thread [threadId={}].", e, worker.threadId()); + } catch (Error e) { + LOG.error( + "Cannot send a heartbeat to a Netty thread, no more heartbeats will be sent [threadId={}].", + e, worker.threadId() + ); + + failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + + throw e; } - } catch (Exception | AssertionError ignored) { - // Ignore. } } diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index ff1280f818..883fa59647 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -777,9 +777,9 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc try { sendSafeTimeSyncIfReplicaReady(entry.getValue()); } catch (Exception | AssertionError e) { - LOG.warn("Error while trying to send a safe time sync request to {}", e, entry.getKey()); + LOG.warn("Error while trying to send a safe time sync request [groupId={}]", e, entry.getKey()); } catch (Error e) { - LOG.error("Error while trying to send a safe time sync request to {}", e, entry.getKey()); + LOG.error("Error while trying to send a safe time sync request [groupId={}]", e, entry.getKey()); failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, e)); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index e355a6e20d..877006b201 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -335,7 +335,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { workerRegistry, threadPoolsManager.commonScheduler(), nettyBootstrapFactory, - workersConfiguration + workersConfiguration, + failureProcessor ); var clusterSvc = new TestScaleCubeClusterServiceFactory().createClusterService( diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 3714a4de87..9cc6b46176 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -465,7 +465,8 @@ public class IgniteImpl implements Ignite { criticalWorkerRegistry, threadPoolsManager.commonScheduler(), nettyBootstrapFactory, - criticalWorkersConfiguration + criticalWorkersConfiguration, + failureProcessor ); clusterSvc = new ScaleCubeClusterServiceFactory().createClusterService(