This is an automated email from the ASF dual-hosted git repository.

zhaoqingran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/master by this push:
     new 120d494c47 [fix]Fixed private collector tasks not taking effect (#3914)
120d494c47 is described below

commit 120d494c471a0e2c28aeaf173a005b401cf32f64
Author: Duansg <[email protected]>
AuthorDate: Tue Dec 16 10:51:39 2025 +0800

    [fix]Fixed private collector tasks not taking effect (#3914)
---
 .../entrance/processor/GoOnlineProcessor.java      |   8 +-
 .../entrance/processor/GoOnlineProcessorTest.java  | 105 +++++++++++++++++++++
 2 files changed, 110 insertions(+), 3 deletions(-)

diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessor.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessor.java
index 34dda15197..acebb42e3a 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessor.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessor.java
@@ -35,9 +35,9 @@ import 
org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
  */
 @Slf4j
 public class GoOnlineProcessor implements NettyRemotingProcessor {
-    
+
     private TimerDispatch timerDispatch;
-    
+
     @Override
     public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
         if (this.timerDispatch == null) {
@@ -53,7 +53,9 @@ public class GoOnlineProcessor implements 
NettyRemotingProcessor {
                 AesUtil.setDefaultSecretKey(serverInfo.getAesSecret());
             }
         }
-        timerDispatch.goOnline();
+        if (ClusterMsg.Direction.REQUEST.equals(message.getDirection())) {
+            timerDispatch.goOnline();
+        }
         log.info("receive online message and handle success");
         return ClusterMsg.Message.newBuilder()
                 .setIdentity(message.getIdentity())
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessorTest.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessorTest.java
new file mode 100644
index 0000000000..d43de3e97d
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.dispatch.entrance.processor;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.hertzbeat.collector.timer.TimerDispatch;
+import org.apache.hertzbeat.collector.timer.TimerDispatcher;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.support.SpringContextHolder;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test for GoOnlineProcessor
+ */
+class GoOnlineProcessorTest {
+
+    private GoOnlineProcessor goOnlineProcessor;
+    private TimerDispatcher timerDispatcher;
+
+    @Mock
+    private ChannelHandlerContext channelHandlerContext;
+
+    private MockedStatic<SpringContextHolder> springContextHolderMockedStatic;
+
+    @BeforeEach
+    void setUp() {
+        MockitoAnnotations.openMocks(this);
+        goOnlineProcessor = new GoOnlineProcessor();
+        timerDispatcher = new TimerDispatcher();
+        springContextHolderMockedStatic = 
Mockito.mockStatic(SpringContextHolder.class);
+        springContextHolderMockedStatic.when(() -> 
SpringContextHolder.getBean(TimerDispatch.class)).thenReturn(timerDispatcher);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        springContextHolderMockedStatic.close();
+        timerDispatcher.destroy();
+    }
+
+    @Test
+    void verifyTaskMapPreservation() throws Exception {
+        Job job = Job.builder()
+            .app("test")
+            .id(12345L)
+            
.metrics(Lists.newArrayList(Metrics.builder().interval(100L).build()))
+            .configmap(Lists.newArrayList())
+            .isCyclic(true)
+            .build();
+        timerDispatcher.addJob(job, null);
+
+        Field cyclicTaskMapField = 
TimerDispatcher.class.getDeclaredField("currentCyclicTaskMap");
+        cyclicTaskMapField.setAccessible(true);
+        Map<?, ?> currentCyclicTaskMap = (Map<?, ?>) 
cyclicTaskMapField.get(timerDispatcher);
+        assertEquals(1, currentCyclicTaskMap.size(), "Task map should have 1 
job initially");
+
+        ClusterMsg.Message responseMsg = ClusterMsg.Message.newBuilder()
+            .setType(ClusterMsg.MessageType.GO_ONLINE)
+            .setDirection(ClusterMsg.Direction.RESPONSE)
+            .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job)))
+            .setIdentity("test-identity")
+            .build();
+        goOnlineProcessor.handle(channelHandlerContext, responseMsg);
+        assertEquals(1, currentCyclicTaskMap.size(), "Task map should still 
have 1 job after receiving RESPONSE");
+
+        ClusterMsg.Message requestMsg = ClusterMsg.Message.newBuilder()
+            .setType(ClusterMsg.MessageType.GO_ONLINE)
+            .setDirection(ClusterMsg.Direction.REQUEST)
+            .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job)))
+            .setIdentity("test-identity")
+            .build();
+        goOnlineProcessor.handle(channelHandlerContext, requestMsg);
+        assertEquals(0, currentCyclicTaskMap.size(), "Task map should be empty 
after receiving REQUEST");
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to