This is an automated email from the ASF dual-hosted git repository.

Aias00 pushed a commit to branch 2.0.0
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git

commit 3747e33e7c3a1292917aa477144768ed7a4c617f
Author: liuhy <[email protected]>
AuthorDate: Fri Jun 5 14:26:19 2026 +0800

    fix(concurrency): fix race conditions and interrupt flag handling
    
    - CollectorJobScheduler: register response listener BEFORE sending message
      to prevent silent data loss when response arrives before registration;
      add finally block to always clean up listener from eventListeners map
    - ConsistentHash: add synchronized to addNode/removeNode/dispatchJob/
      preDispatchJob to prevent concurrent modification of hashCircle and
      existNodeMap during collector rebalancing
    - Metrics: replace synchronized(subTaskNum) with dedicated subTaskLock
      object to prevent NPE when AtomicInteger is null after deserialization
    - NettyRemotingAbstract: replace containsKey()+get() with single get()+null
      check to eliminate TOCTOU race with sendMsgSyncImpl's finally block;
      restore interrupt flag on InterruptedException
    - NettyRemotingClient/Server: restore Thread.interrupt() flag after
      catching InterruptedException to preserve shutdown signal
    
    Co-Authored-By: Claude Opus 4.8 <[email protected]>
---
 .../hertzbeat/common/entity/job/Metrics.java       |  5 +-
 .../manager/scheduler/CollectorJobScheduler.java   | 73 +++++++++++++---------
 .../manager/scheduler/ConsistentHash.java          |  8 +--
 .../remoting/netty/NettyRemotingAbstract.java      | 10 +--
 .../remoting/netty/NettyRemotingClient.java        |  1 +
 .../remoting/netty/NettyRemotingServer.java        |  1 +
 6 files changed, 59 insertions(+), 39 deletions(-)

diff --git 
a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
 
b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
index ca10145bc9..ed5aa17cd4 100644
--- 
a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
+++ 
b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
@@ -308,6 +308,9 @@ public class Metrics {
     /**
      * collector use - Temporarily store subTask running num
      */
+    @JsonIgnore
+    private final transient Object subTaskLock = new Object();
+
     @JsonIgnore
     private transient AtomicInteger subTaskNum;
 
@@ -333,7 +336,7 @@ public class Metrics {
      * @return is last task?
      */
     public boolean consumeSubTaskResponse(CollectRep.MetricsData metricsData) {
-        synchronized (subTaskNum) {
+        synchronized (subTaskLock) {
             int index = subTaskNum.decrementAndGet();
             if (subTaskDataRef.get() == null) {
                 
subTaskDataRef.set(CollectRep.MetricsData.newBuilder(metricsData));
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
index 915b549e1f..b1eb40e263 100644
--- 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
@@ -304,6 +304,18 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
             List<CollectRep.MetricsData> metricsData = new LinkedList<>();
             CountDownLatch countDownLatch = new CountDownLatch(1);
 
+            // Register listener BEFORE sending to avoid race where response 
arrives before registration
+            CollectResponseEventListener listener = new 
CollectResponseEventListener() {
+                @Override
+                public void response(List<CollectRep.MetricsData> 
responseMetrics) {
+                    if (responseMetrics != null) {
+                        metricsData.addAll(responseMetrics);
+                    }
+                    countDownLatch.countDown();
+                }
+            };
+            eventListeners.put(job.getMonitorId(), listener);
+
             ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
                     .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK)
                     .setDirection(ClusterMsg.Direction.REQUEST)
@@ -311,22 +323,17 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
                     .build();
             boolean result = this.manageServer.sendMsg(node.getIdentity(), 
message);
 
-            if (result) {
-                CollectResponseEventListener listener = new 
CollectResponseEventListener() {
-                    @Override
-                    public void response(List<CollectRep.MetricsData> 
responseMetrics) {
-                        if (responseMetrics != null) {
-                            metricsData.addAll(responseMetrics);
-                        }
-                        countDownLatch.countDown();
-                    }
-                };
-                eventListeners.put(job.getMonitorId(), listener);
-            }
             try {
-                countDownLatch.await(120, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                log.info("The sync task runs for 120 seconds with no response 
and returns");
+                if (result) {
+                    try {
+                        countDownLatch.await(120, TimeUnit.SECONDS);
+                    } catch (Exception e) {
+                        log.info("The sync task runs for 120 seconds with no 
response and returns");
+                    }
+                }
+            } finally {
+                // Always remove listener to prevent memory leak
+                eventListeners.remove(job.getMonitorId());
             }
             return metricsData;
         }
@@ -347,29 +354,35 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
             return collectJobService.collectSyncJobData(job);
         }
         List<CollectRep.MetricsData> metricsData = new LinkedList<>();
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        // Register listener BEFORE sending to avoid race where response 
arrives before registration
+        CollectResponseEventListener listener = new 
CollectResponseEventListener() {
+            @Override
+            public void response(List<CollectRep.MetricsData> responseMetrics) 
{
+                if (responseMetrics != null) {
+                    metricsData.addAll(responseMetrics);
+                }
+                countDownLatch.countDown();
+            }
+        };
+        eventListeners.put(job.getMonitorId(), listener);
         ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
                 .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK)
                 .setDirection(ClusterMsg.Direction.REQUEST)
                 .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job)))
                 .build();
         boolean result = this.manageServer.sendMsg(node.getIdentity(), 
message);
-        if (result) {
-            CountDownLatch countDownLatch = new CountDownLatch(1);
-            CollectResponseEventListener listener = new 
CollectResponseEventListener() {
-                @Override
-                public void response(List<CollectRep.MetricsData> 
responseMetrics) {
-                    if (responseMetrics != null) {
-                        metricsData.addAll(responseMetrics);
-                    }
-                    countDownLatch.countDown();
+        try {
+            if (result) {
+                try {
+                    countDownLatch.await(120, TimeUnit.SECONDS);
+                } catch (Exception e) {
+                    log.info("The sync task runs for 120 seconds with no 
response and returns");
                 }
-            };
-            eventListeners.put(job.getMonitorId(), listener);
-            try {
-                countDownLatch.await(120, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                log.info("The sync task runs for 120 seconds with no response 
and returns");
             }
+        } finally {
+            // Always remove listener to prevent memory leak
+            eventListeners.remove(job.getMonitorId());
         }
         return metricsData;
     }
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHash.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHash.java
index 44d1707aaf..e2873d6bd2 100644
--- 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHash.java
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHash.java
@@ -106,7 +106,7 @@ public class ConsistentHash {
      * add collector node
      * @param newNode node
      */
-    public void addNode(Node newNode) {
+    public synchronized void addNode(Node newNode) {
         // when mode is cluster public, need reBalance dispatch jobs. else not 
when is cloud-edge private
         if (!CommonConstants.MODE_PRIVATE.equals(newNode.mode)) {
             byte virtualNodeNum = newNode.quality == null ? 
VIRTUAL_NODE_DEFAULT_SIZE : newNode.quality;
@@ -160,7 +160,7 @@ public class ConsistentHash {
      * deleted collector node
      * @param name collector name
      */
-    public Node removeNode(String name) {
+    public synchronized Node removeNode(String name) {
         Node deletedNode = existNodeMap.remove(name);
         if (deletedNode == null) {
             return null;
@@ -247,7 +247,7 @@ public class ConsistentHash {
      * @param isFlushed is has flush this job or wait to dispatch
      * @return collector node
      */
-    public Node dispatchJob(Integer dispatchHash, Long jobId, boolean 
isFlushed) {
+    public synchronized Node dispatchJob(Integer dispatchHash, Long jobId, 
boolean isFlushed) {
         if (dispatchHash == null || hashCircle == null || 
hashCircle.isEmpty()) {
             log.warn("There is no available collector registered. Cache the 
job {}.", jobId);
             dispatchJobCache.add(new DispatchJob(dispatchHash, jobId));
@@ -267,7 +267,7 @@ public class ConsistentHash {
      * @param dispatchHash The task route hash is collected
      * @return collector node
      */
-    public Node preDispatchJob(Integer dispatchHash) {
+    public synchronized Node preDispatchJob(Integer dispatchHash) {
         if (dispatchHash == null || hashCircle == null || 
hashCircle.isEmpty()) {
             log.warn("There is no available collector registered.");
             return null;
diff --git 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java
 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java
index 3d9390f467..0e893a6427 100644
--- 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java
+++ 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java
@@ -87,9 +87,10 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
     }
 
     protected void processResponseMsg(ChannelHandlerContext ctx, 
ClusterMsg.Message response) {
-        // for sync response
-        if (this.responseTable.containsKey(response.getIdentity())) {
-            ResponseFuture responseFuture = 
this.responseTable.get(response.getIdentity());
+        // for sync response — use get() directly instead of 
containsKey()+get()
+        // to avoid TOCTOU race with sendMsgSyncImpl's finally block
+        ResponseFuture responseFuture = 
this.responseTable.get(response.getIdentity());
+        if (responseFuture != null) {
             responseFuture.putResponse(response);
         } else {
             // async response
@@ -99,7 +100,7 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
                 if (repMessage != null) {
                     ctx.writeAndFlush(repMessage);
                 }
-            }   
+            }
         }
     }
 
@@ -130,6 +131,7 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
             return response;
         } catch (InterruptedException e) {
             log.warn("get response message failed, ", e);
+            Thread.currentThread().interrupt();
         } finally {
             responseTable.remove(identity);
         }
diff --git 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
index a01c7fab80..622a19dd24 100644
--- 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
+++ 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
@@ -112,6 +112,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                     try {
                         Thread.sleep(10000);
                     } catch (InterruptedException ignored) {
+                        Thread.currentThread().interrupt();
                     }
                 }
             }
diff --git 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
index 814977c98f..229cc5f380 100644
--- 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
+++ 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
@@ -120,6 +120,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                 channel.closeFuture().sync();
             } catch (InterruptedException ignored) {
                 log.info("server shutdown now!");
+                Thread.currentThread().interrupt();
             } catch (Exception e) {
                 log.error("Netty Server start exception, {}", e.getMessage());
                 throw new RuntimeException(e);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to