This is an automated email from the ASF dual-hosted git repository. Aias00 pushed a commit to branch 2.0.0 in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
commit 3747e33e7c3a1292917aa477144768ed7a4c617f Author: liuhy <[email protected]> AuthorDate: Fri Jun 5 14:26:19 2026 +0800 fix(concurrency): fix race conditions and interrupt flag handling - CollectorJobScheduler: register response listener BEFORE sending message to prevent silent data loss when response arrives before registration; add finally block to always clean up listener from eventListeners map - ConsistentHash: add synchronized to addNode/removeNode/dispatchJob/ preDispatchJob to prevent concurrent modification of hashCircle and existNodeMap during collector rebalancing - Metrics: replace synchronized(subTaskNum) with dedicated subTaskLock object to prevent NPE when AtomicInteger is null after deserialization - NettyRemotingAbstract: replace containsKey()+get() with single get()+null check to eliminate TOCTOU race with sendMsgSyncImpl's finally block; restore interrupt flag on InterruptedException - NettyRemotingClient/Server: restore Thread.interrupt() flag after catching InterruptedException to preserve shutdown signal Co-Authored-By: Claude Opus 4.8 <[email protected]> --- .../hertzbeat/common/entity/job/Metrics.java | 5 +- .../manager/scheduler/CollectorJobScheduler.java | 73 +++++++++++++--------- .../manager/scheduler/ConsistentHash.java | 8 +-- .../remoting/netty/NettyRemotingAbstract.java | 10 +-- .../remoting/netty/NettyRemotingClient.java | 1 + .../remoting/netty/NettyRemotingServer.java | 1 + 6 files changed, 59 insertions(+), 39 deletions(-) diff --git a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java index ca10145bc9..ed5aa17cd4 100644 --- a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java +++ b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java @@ -308,6 +308,9 @@ public class Metrics { /** * collector use - Temporarily store subTask running num */ + @JsonIgnore + private final transient Object subTaskLock = new Object(); + @JsonIgnore private transient AtomicInteger subTaskNum; @@ -333,7 +336,7 @@ public class Metrics { * @return is last task? */ public boolean consumeSubTaskResponse(CollectRep.MetricsData metricsData) { - synchronized (subTaskNum) { + synchronized (subTaskLock) { int index = subTaskNum.decrementAndGet(); if (subTaskDataRef.get() == null) { subTaskDataRef.set(CollectRep.MetricsData.newBuilder(metricsData)); diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java index 915b549e1f..b1eb40e263 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java @@ -304,6 +304,18 @@ public class CollectorJobScheduler implements CollectorScheduling, CollectJobSch List<CollectRep.MetricsData> metricsData = new LinkedList<>(); CountDownLatch countDownLatch = new CountDownLatch(1); + // Register listener BEFORE sending to avoid race where response arrives before registration + CollectResponseEventListener listener = new CollectResponseEventListener() { + @Override + public void response(List<CollectRep.MetricsData> responseMetrics) { + if (responseMetrics != null) { + metricsData.addAll(responseMetrics); + } + countDownLatch.countDown(); + } + }; + eventListeners.put(job.getMonitorId(), listener); + ClusterMsg.Message message = ClusterMsg.Message.newBuilder() .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK) .setDirection(ClusterMsg.Direction.REQUEST) @@ -311,22 +323,17 @@ public class CollectorJobScheduler implements CollectorScheduling, CollectJobSch .build(); boolean result = this.manageServer.sendMsg(node.getIdentity(), message); - if (result) { - CollectResponseEventListener listener = new CollectResponseEventListener() { - @Override - public void response(List<CollectRep.MetricsData> responseMetrics) { - if (responseMetrics != null) { - metricsData.addAll(responseMetrics); - } - countDownLatch.countDown(); - } - }; - eventListeners.put(job.getMonitorId(), listener); - } try { - countDownLatch.await(120, TimeUnit.SECONDS); - } catch (Exception e) { - log.info("The sync task runs for 120 seconds with no response and returns"); + if (result) { + try { + countDownLatch.await(120, TimeUnit.SECONDS); + } catch (Exception e) { + log.info("The sync task runs for 120 seconds with no response and returns"); + } + } + } finally { + // Always remove listener to prevent memory leak + eventListeners.remove(job.getMonitorId()); } return metricsData; } @@ -347,29 +354,35 @@ public class CollectorJobScheduler implements CollectorScheduling, CollectJobSch return collectJobService.collectSyncJobData(job); } List<CollectRep.MetricsData> metricsData = new LinkedList<>(); + CountDownLatch countDownLatch = new CountDownLatch(1); + // Register listener BEFORE sending to avoid race where response arrives before registration + CollectResponseEventListener listener = new CollectResponseEventListener() { + @Override + public void response(List<CollectRep.MetricsData> responseMetrics) { + if (responseMetrics != null) { + metricsData.addAll(responseMetrics); + } + countDownLatch.countDown(); + } + }; + eventListeners.put(job.getMonitorId(), listener); ClusterMsg.Message message = ClusterMsg.Message.newBuilder() .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK) .setDirection(ClusterMsg.Direction.REQUEST) .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job))) .build(); boolean result = this.manageServer.sendMsg(node.getIdentity(), message); - if (result) { - CountDownLatch countDownLatch = new CountDownLatch(1); - CollectResponseEventListener listener = new CollectResponseEventListener() { - @Override - public void response(List<CollectRep.MetricsData> responseMetrics) { - if (responseMetrics != null) { - metricsData.addAll(responseMetrics); - } - countDownLatch.countDown(); + try { + if (result) { + try { + countDownLatch.await(120, TimeUnit.SECONDS); + } catch (Exception e) { + log.info("The sync task runs for 120 seconds with no response and returns"); } - }; - eventListeners.put(job.getMonitorId(), listener); - try { - countDownLatch.await(120, TimeUnit.SECONDS); - } catch (Exception e) { - log.info("The sync task runs for 120 seconds with no response and returns"); } + } finally { + // Always remove listener to prevent memory leak + eventListeners.remove(job.getMonitorId()); } return metricsData; } diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHash.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHash.java index 44d1707aaf..e2873d6bd2 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHash.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHash.java @@ -106,7 +106,7 @@ public class ConsistentHash { * add collector node * @param newNode node */ - public void addNode(Node newNode) { + public synchronized void addNode(Node newNode) { // when mode is cluster public, need reBalance dispatch jobs. else not when is cloud-edge private if (!CommonConstants.MODE_PRIVATE.equals(newNode.mode)) { byte virtualNodeNum = newNode.quality == null ? VIRTUAL_NODE_DEFAULT_SIZE : newNode.quality; @@ -160,7 +160,7 @@ public class ConsistentHash { * deleted collector node * @param name collector name */ - public Node removeNode(String name) { + public synchronized Node removeNode(String name) { Node deletedNode = existNodeMap.remove(name); if (deletedNode == null) { return null; @@ -247,7 +247,7 @@ public class ConsistentHash { * @param isFlushed is has flush this job or wait to dispatch * @return collector node */ - public Node dispatchJob(Integer dispatchHash, Long jobId, boolean isFlushed) { + public synchronized Node dispatchJob(Integer dispatchHash, Long jobId, boolean isFlushed) { if (dispatchHash == null || hashCircle == null || hashCircle.isEmpty()) { log.warn("There is no available collector registered. Cache the job {}.", jobId); dispatchJobCache.add(new DispatchJob(dispatchHash, jobId)); @@ -267,7 +267,7 @@ public class ConsistentHash { * @param dispatchHash The task route hash is collected * @return collector node */ - public Node preDispatchJob(Integer dispatchHash) { + public synchronized Node preDispatchJob(Integer dispatchHash) { if (dispatchHash == null || hashCircle == null || hashCircle.isEmpty()) { log.warn("There is no available collector registered."); return null; diff --git a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java index 3d9390f467..0e893a6427 100644 --- a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java +++ b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java @@ -87,9 +87,10 @@ public abstract class NettyRemotingAbstract implements RemotingService { } protected void processResponseMsg(ChannelHandlerContext ctx, ClusterMsg.Message response) { - // for sync response - if (this.responseTable.containsKey(response.getIdentity())) { - ResponseFuture responseFuture = this.responseTable.get(response.getIdentity()); + // for sync response — use get() directly instead of containsKey()+get() + // to avoid TOCTOU race with sendMsgSyncImpl's finally block + ResponseFuture responseFuture = this.responseTable.get(response.getIdentity()); + if (responseFuture != null) { responseFuture.putResponse(response); } else { // async response @@ -99,7 +100,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { if (repMessage != null) { ctx.writeAndFlush(repMessage); } - } + } } } @@ -130,6 +131,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { return response; } catch (InterruptedException e) { log.warn("get response message failed, ", e); + Thread.currentThread().interrupt(); } finally { responseTable.remove(identity); } diff --git a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java index a01c7fab80..622a19dd24 100644 --- a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java +++ b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java @@ -112,6 +112,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti try { Thread.sleep(10000); } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); } } } diff --git a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java index 814977c98f..229cc5f380 100644 --- a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java +++ b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java @@ -120,6 +120,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti channel.closeFuture().sync(); } catch (InterruptedException ignored) { log.info("server shutdown now!"); + Thread.currentThread().interrupt(); } catch (Exception e) { log.error("Netty Server start exception, {}", e.getMessage()); throw new RuntimeException(e); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
