rpuch commented on code in PR #7488:
URL: https://github.com/apache/ignite-3/pull/7488#discussion_r2746277750


##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -163,17 +172,22 @@ public DefaultMessagingService(
         this.marshaller = marshaller;
         this.criticalWorkerRegistry = criticalWorkerRegistry;
         this.failureProcessor = failureProcessor;
+        this.metricManager = metricManager;
 
         outboundExecutor = new CriticalSingleThreadExecutor(
-                IgniteMessageServiceThreadFactory.create(nodeName, 
"MessagingService-outbound", LOG, NOTHING_ALLOWED)

Review Comment:
   Could you please revert the change in this line? The original line allowed 
to see clearly where the argument is and where the closing paren is



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -163,17 +172,22 @@ public DefaultMessagingService(
         this.marshaller = marshaller;
         this.criticalWorkerRegistry = criticalWorkerRegistry;
         this.failureProcessor = failureProcessor;
+        this.metricManager = metricManager;
 
         outboundExecutor = new CriticalSingleThreadExecutor(
-                IgniteMessageServiceThreadFactory.create(nodeName, 
"MessagingService-outbound", LOG, NOTHING_ALLOWED)
-        );
+                IgniteMessageServiceThreadFactory.create(nodeName, 
"MessagingService-outbound", LOG, NOTHING_ALLOWED));
+        outboundExecutor.initMetricSource(metricManager, 
"network.messaging.default.executor.outbound",

Review Comment:
   What does 'default' stand here for?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingServiceMetricSource.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.metrics.MetricSource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Metric source for the {@link DefaultMessagingService}.
+ */
+class DefaultMessagingServiceMetricSource implements MetricSource {
+    /** Metrics map. Only modified in {@code synchronized} context. */
+    private final Map<String, Metric> metrics = new HashMap<>();
+
+    /** Enabled flag. Only modified in {@code synchronized} context. */
+    private boolean enabled;
+
+    @Override
+    public String name() {
+        return "network.messaging.default";

Review Comment:
   Why do we need this 'default'?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -726,10 +783,11 @@ public void stop() throws Exception {
     }
 
     // TODO: IGNITE-18493 - remove/move this
+
     /**
      * Installs a predicate, it will be consulted with for each message being 
sent; when it returns {@code true}, the
      * message will be dropped (it will not be sent; the corresponding future 
will time out soon for {@code invoke()} methods
-     * and will never complete for methods different from {@code invoke()}).
+     * and will never complete formethods different from {@code invoke()}).

Review Comment:
   ```suggestion
        * and will never complete for methods different from {@code invoke()}).
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -183,6 +197,12 @@ public DefaultMessagingService(
                 requestsMap,
                 failureProcessor
         );
+
+        metricSource = new DefaultMessagingServiceMetricSource();
+        metrics = new DefaultMessagingServiceMetrics(metricSource);
+
+        metricManager.registerSource(metricSource);
+        metricManager.enable(metricSource);

Review Comment:
   Should we register and enable metric source on start, not in the constructor?



##########
modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java:
##########
@@ -186,18 +188,38 @@ public void registerSource(MetricSource src) {
 
     @Override
     public void unregisterSource(MetricSource src) {
-        inBusyLockSafe(busyLock, () -> {
-            disable(src);
-            registry.unregisterSource(src);
-        });
+        try {
+            if (metricSources().contains(src)) {
+                inBusyLockSafe(busyLock, () -> {
+                    disable(src);
+                    registry.unregisterSource(src);
+                });
+            }
+        } catch (Exception e) {
+            Throwable rootEx = unwrapRootCause(e);
+
+            if (!(rootEx instanceof NodeStoppingException)) {
+                log.error("Failed to unregister metrics source {}", e, 
src.name());
+            }

Review Comment:
   Let's use `ExceptionUtils.hasCause()`.
   
   Also, how can an exception caused by a `NodeStoppingException` be thrown by 
this code?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingServiceMetrics.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+
+class DefaultMessagingServiceMetrics {
+    private final AtomicLongMetric messageSendFailures;
+
+    private final AtomicLongMetric messageRecipientNotFound;
+
+    private final AtomicLongMetric invokeRequestFailures;
+
+    private final AtomicLongMetric invokeResponseFailures;
+
+    private final AtomicLongMetric messageSerializationFailures;
+
+    private final AtomicLongMetric messageDeserializationFailures;
+
+    private final AtomicLongMetric connectionFailures;
+
+    private final AtomicLongMetric invokeTimeouts;
+
+    private final AtomicLongMetric slowResponses;

Review Comment:
   How is 'slow' defined?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -183,6 +197,12 @@ public DefaultMessagingService(
                 requestsMap,
                 failureProcessor
         );
+
+        metricSource = new DefaultMessagingServiceMetricSource();
+        metrics = new DefaultMessagingServiceMetrics(metricSource);

Review Comment:
   Let's initialize them in field declarations



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingServiceMetrics.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+
+class DefaultMessagingServiceMetrics {
+    private final AtomicLongMetric messageSendFailures;

Review Comment:
   What's the source of the metrics defined here? Does this come from some 
issue/document/discussion?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingServiceMetrics.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+
+class DefaultMessagingServiceMetrics {
+    private final AtomicLongMetric messageSendFailures;
+
+    private final AtomicLongMetric messageRecipientNotFound;
+
+    private final AtomicLongMetric invokeRequestFailures;
+
+    private final AtomicLongMetric invokeResponseFailures;
+
+    private final AtomicLongMetric messageSerializationFailures;
+
+    private final AtomicLongMetric messageDeserializationFailures;

Review Comment:
   Why do we need those? Should we just invoke failure processor if some 
message cannot be (de)serialized?



##########
modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java:
##########
@@ -26,16 +26,27 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricSource;
+import 
org.apache.ignite.internal.metrics.sources.StripedThreadPoolMetricSource;
 import org.apache.ignite.internal.thread.AbstractStripedThreadPoolExecutor;
 import org.apache.ignite.internal.thread.StripedExecutor;
 import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Same as {@link StripedThreadPoolExecutor}, but each stripe is a critical 
worker monitored for being blocked.
  */
 public class CriticalStripedThreadPoolExecutor extends 
AbstractStripedThreadPoolExecutor<ExecutorService> implements StripedExecutor {
     private final List<CriticalWorker> workers;
 
+    @Nullable
+    private MetricManager metricManager;
+
+    @Nullable
+    private MetricSource metricSource;

Review Comment:
   Same question about nullability



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingServiceMetrics.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+
+class DefaultMessagingServiceMetrics {
+    private final AtomicLongMetric messageSendFailures;
+
+    private final AtomicLongMetric messageRecipientNotFound;
+
+    private final AtomicLongMetric invokeRequestFailures;
+
+    private final AtomicLongMetric invokeResponseFailures;

Review Comment:
   What are those?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingServiceMetrics.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+
+class DefaultMessagingServiceMetrics {
+    private final AtomicLongMetric messageSendFailures;
+
+    private final AtomicLongMetric messageRecipientNotFound;
+
+    private final AtomicLongMetric invokeRequestFailures;
+
+    private final AtomicLongMetric invokeResponseFailures;
+
+    private final AtomicLongMetric messageSerializationFailures;
+
+    private final AtomicLongMetric messageDeserializationFailures;
+
+    private final AtomicLongMetric connectionFailures;
+
+    private final AtomicLongMetric invokeTimeouts;
+
+    private final AtomicLongMetric slowResponses;
+
+    DefaultMessagingServiceMetrics(DefaultMessagingServiceMetricSource source) 
{
+        messageSendFailures = source.addMetric(new AtomicLongMetric(
+                "messageSendFailures",
+                "Total number of failed outgoing messages."
+        ));
+
+        messageRecipientNotFound = source.addMetric(new AtomicLongMetric(
+                "messageRecipientNotFound",
+                "Total number of message recipient resolution failures."
+        ));
+
+        invokeRequestFailures = source.addMetric(new AtomicLongMetric(
+                "invokeRequestFailures",
+                "Total number of failed outgoing request invocations."
+        ));
+
+        invokeResponseFailures = source.addMetric(new AtomicLongMetric(
+                "invokeResponseFailures",

Review Comment:
   The name of the metric made me think it was about failures when processing 
invoke responses. But it seems that it's about failures when sending such 
responses to other nodes. Is this true?



##########
modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java:
##########
@@ -186,18 +188,38 @@ public void registerSource(MetricSource src) {
 
     @Override
     public void unregisterSource(MetricSource src) {
-        inBusyLockSafe(busyLock, () -> {
-            disable(src);
-            registry.unregisterSource(src);
-        });
+        try {
+            if (metricSources().contains(src)) {
+                inBusyLockSafe(busyLock, () -> {
+                    disable(src);
+                    registry.unregisterSource(src);
+                });
+            }
+        } catch (Exception e) {
+            Throwable rootEx = unwrapRootCause(e);
+
+            if (!(rootEx instanceof NodeStoppingException)) {
+                log.error("Failed to unregister metrics source {}", e, 
src.name());
+            }
+        }
     }
 
     @Override
     public void unregisterSource(String srcName) {
-        inBusyLockSafe(busyLock, () -> {
-            disable(srcName);
-            registry.unregisterSource(srcName);
-        });
+        try {
+            if (metricSources().stream().anyMatch(metricSource -> 
metricSource.name().equals(srcName))) {
+                inBusyLockSafe(busyLock, () -> {
+                    disable(srcName);
+                    registry.unregisterSource(srcName);
+                });
+            }
+        } catch (Exception e) {

Review Comment:
   This pattern doesn't look very beautiful, and there is also duplication on 
top. Let's discuss it higher first and come back here after we are done there



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingServiceMetricSource.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.metrics.MetricSource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Metric source for the {@link DefaultMessagingService}.
+ */
+class DefaultMessagingServiceMetricSource implements MetricSource {
+    /** Metrics map. Only modified in {@code synchronized} context. */
+    private final Map<String, Metric> metrics = new HashMap<>();
+
+    /** Enabled flag. Only modified in {@code synchronized} context. */
+    private boolean enabled;
+
+    @Override
+    public String name() {
+        return "network.messaging.default";
+    }
+
+    @Override
+    public @Nullable String description() {
+        return "Metrics for the default messaging service.";

Review Comment:
   Oh, I think I get this 'default' thing. `Default` in the class name does not 
have a meaning for the users, so we should not include this word in the metrics 
names



##########
modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java:
##########
@@ -93,4 +104,44 @@ private static CriticalSingleThreadExecutor[] 
createExecutors(
     public Collection<CriticalWorker> workers() {
         return workers;
     }
+
+    /**
+     * Initialize the metric source to track this thread pool's metrics.
+     *
+     * @param metricManager the metric manager used to register the source.
+     * @param name the name of the metric.
+     * @param description the metric description.

Review Comment:
   ```suggestion
        * @param metricManager The metric manager used to register the source.
        * @param name The name of the metric.
        * @param description The metric description.
   ```



##########
modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java:
##########
@@ -34,6 +40,9 @@ public class CriticalSingleThreadExecutor extends 
ThreadPoolExecutor implements
     private volatile Thread lastSeenThread;
     private volatile long heartbeatNanos = NOT_MONITORED;
 
+    private @Nullable MetricSource metricSource;
+    private @Nullable MetricManager metricManager;

Review Comment:
   Why are they nullable?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedExecutors.java:
##########
@@ -51,11 +53,15 @@ class CriticalStripedExecutors implements ManuallyCloseable 
{
             String poolNamePrefix,
             CriticalWorkerRegistry workerRegistry,
             ChannelTypeRegistry channelTypeRegistry,
-            IgniteLogger log
+            IgniteLogger log,
+            @Nullable MetricManager metricManager,

Review Comment:
   Why are they nullable?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedThreadPoolExecutorFactory.java:
##########
@@ -43,18 +45,45 @@ class CriticalStripedThreadPoolExecutorFactory {
 
     private final List<CriticalWorker> registeredWorkers;
 
+    @Nullable
+    private final MetricManager metricManager;
+
+    @Nullable
+    private final String metricNamePrefix;
+
+    @Nullable
+    private final String metricDescription;

Review Comment:
   Why are they nullable?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -390,13 +420,17 @@ private CompletableFuture<Void> sendViaNetwork(
         try {
             descriptors = prepareMarshal(message);
         } catch (Exception e) {
+            metrics.incrementMessageSerializationFailures();
+
             return failedFuture(new IgniteException(INTERNAL_ERR, "Failed to 
marshal message: " + e.getMessage(), e));
         }
 
         return connectionManager.channel(nodeId, type, addr)
                 .thenComposeToCompletable(sender -> {
                     if (strictIdCheck && nodeId != null && 
!sender.launchId().equals(nodeId)) {
                         // The destination node has been rebooted, so it's a 
different node instance.
+                        metrics.incrementMessageSendFailures();

Review Comment:
   This code is not just about sends, but also about invocations



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -405,12 +439,20 @@ private CompletableFuture<Void> sendViaNetwork(
                             () -> triggerChannelCreation(nodeId, type, addr)
                     );
                 })
-                .whenComplete((res, ex) -> handleHandshakeError(ex, nodeId, 
type, addr));
+                .whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        metrics.incrementMessageSendFailures();
+                    }
+
+                    handleHandshakeError(ex, nodeId, type, addr);
+                });
     }
 
     private void handleHandshakeError(Throwable ex, UUID nodeId, ChannelType 
type, InetSocketAddress addr) {
         if (ex != null) {
             if (hasCause(ex, CriticalHandshakeException.class)) {
+                metrics.incrementConnectionFailures();

Review Comment:
   I'm not sure we even need a metric for this case as the case should be 
eliminated



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -651,7 +700,13 @@ private void onInvokeResponse(NetworkMessage response, 
Long correlationId) {
         TimeoutObjectImpl responseFuture = requestsMap.remove(correlationId);
 
         if (responseFuture != null) {
-            responseFuture.future().complete(response);
+            var fut = responseFuture.future();
+
+            if (fut.isCompletedExceptionally()) {
+                metrics.incrementInvokeTimeouts();
+            } else {
+                fut.complete(response);
+            }

Review Comment:
   There seems to be a race between checking and completing. Also, why is it 
about timeouts?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedThreadPoolExecutorFactory.java:
##########
@@ -43,18 +45,45 @@ class CriticalStripedThreadPoolExecutorFactory {
 
     private final List<CriticalWorker> registeredWorkers;
 
+    @Nullable
+    private final MetricManager metricManager;
+
+    @Nullable
+    private final String metricNamePrefix;
+
+    @Nullable
+    private final String metricDescription;
+
+    @SuppressWarnings("unused")

Review Comment:
   Why is this suppression needed?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -163,17 +172,22 @@ public DefaultMessagingService(
         this.marshaller = marshaller;
         this.criticalWorkerRegistry = criticalWorkerRegistry;
         this.failureProcessor = failureProcessor;
+        this.metricManager = metricManager;
 
         outboundExecutor = new CriticalSingleThreadExecutor(
-                IgniteMessageServiceThreadFactory.create(nodeName, 
"MessagingService-outbound", LOG, NOTHING_ALLOWED)
-        );
+                IgniteMessageServiceThreadFactory.create(nodeName, 
"MessagingService-outbound", LOG, NOTHING_ALLOWED));
+        outboundExecutor.initMetricSource(metricManager, 
"network.messaging.default.executor.outbound",
+                "Outbound message executor metrics");
 
         inboundExecutors = new CriticalStripedExecutors(
                 nodeName,
                 "MessagingService-inbound",
                 criticalWorkerRegistry,
                 channelTypeRegistry,
-                LOG
+                LOG,
+                metricManager,
+                "network.messaging.default.executor.inbound",

Review Comment:
   Please remove 'default'



##########
modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java:
##########
@@ -44,6 +53,24 @@ public CriticalSingleThreadExecutor(long keepAliveTime, 
TimeUnit unit, BlockingQ
         super(1, 1, keepAliveTime, unit, workQueue, threadFactory);
     }
 
+    /**
+     * Initialize the metric source to track this thread pool's metrics.
+     *
+     * @param metricManager the metric manager used to register the source.
+     * @param name the name of the metric.
+     * @param description the metric description.

Review Comment:
   ```suggestion
        * @param metricManager The metric manager used to register the source.
        * @param name The name of the metric.
        * @param description The metric description.
   ```



##########
modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java:
##########
@@ -44,6 +53,24 @@ public CriticalSingleThreadExecutor(long keepAliveTime, 
TimeUnit unit, BlockingQ
         super(1, 1, keepAliveTime, unit, workQueue, threadFactory);
     }
 
+    /**
+     * Initialize the metric source to track this thread pool's metrics.
+     *
+     * @param metricManager the metric manager used to register the source.
+     * @param name the name of the metric.
+     * @param description the metric description.
+     */
+    public void initMetricSource(MetricManager metricManager, String name, 
String description) {

Review Comment:
   How about doing this in the constructor?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -405,12 +439,20 @@ private CompletableFuture<Void> sendViaNetwork(
                             () -> triggerChannelCreation(nodeId, type, addr)
                     );
                 })
-                .whenComplete((res, ex) -> handleHandshakeError(ex, nodeId, 
type, addr));
+                .whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        metrics.incrementMessageSendFailures();

Review Comment:
   This code is not just about sends, but also about invocations



##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -240,6 +262,8 @@ public CompletableFuture<Void> respond(String 
recipientConsistentId, ChannelType
         InternalClusterNode recipient = 
topologyService.getByConsistentId(recipientConsistentId);
 
         if (recipient == null) {
+            metrics.incrementMessageRecipientNotFound();

Review Comment:
   Why does `recipient == null` lead to incrementing different metrics 
('messageSendFailures' there, but 'messageRecipientNotFound' here)?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedThreadPoolExecutorFactory.java:
##########
@@ -64,6 +93,12 @@ CriticalStripedThreadPoolExecutor create(ChannelType 
channelType) {
         var threadFactory = IgniteMessageServiceThreadFactory.create(nodeName, 
poolName, log, NOTHING_ALLOWED);
         var executor = new 
CriticalStripedThreadPoolExecutor(stripeCountForIndex(channelTypeId), 
threadFactory, false, 0);
 
+        if (metricManager != null && metricNamePrefix != null) {
+            String metricName = String.format("%s.%s", metricNamePrefix, 
channelType.name());
+
+            executor.initMetricSource(metricManager, metricName, 
metricDescription);

Review Comment:
   Can we init metric source on executor creation (in its constructor)?



##########
modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java:
##########
@@ -186,18 +188,38 @@ public void registerSource(MetricSource src) {
 
     @Override
     public void unregisterSource(MetricSource src) {
-        inBusyLockSafe(busyLock, () -> {
-            disable(src);
-            registry.unregisterSource(src);
-        });
+        try {
+            if (metricSources().contains(src)) {

Review Comment:
   Why is this change needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to