This is an automated email from the ASF dual-hosted git repository.

gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/master by this push:
     new 69802634d [bugfix] fix when manager restart, collect register error 
(#1896)
69802634d is described below

commit 69802634d4d71505458bdaebb89bb550de495e5f
Author: Ceilzcx <[email protected]>
AuthorDate: Wed May 1 20:38:28 2024 +0800

    [bugfix] fix when manager restart, collect register error (#1896)
    
    Co-authored-by: tomsun28 <[email protected]>
---
 ...obScheduler.java => CollectorJobScheduler.java} | 51 +++++++++++-----------
 .../hertzbeat/manager/scheduler/SchedulerInit.java |  8 ++--
 .../manager/scheduler/netty/ManageServer.java      | 22 +++++-----
 .../netty/process/HeartbeatProcessor.java          | 13 ++++--
 4 files changed, 50 insertions(+), 44 deletions(-)

diff --git 
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorAndJobScheduler.java
 
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
similarity index 93%
rename from 
manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorAndJobScheduler.java
rename to 
manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
index 5acc10ada..c2c63dee0 100644
--- 
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorAndJobScheduler.java
+++ 
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
@@ -61,7 +61,7 @@ import org.springframework.util.StringUtils;
 @Component
 @AutoConfigureAfter(value = {SchedulerProperties.class})
 @Slf4j
-public class CollectorAndJobScheduler implements CollectorScheduling, 
CollectJobScheduling {
+public class CollectorJobScheduler implements CollectorScheduling, 
CollectJobScheduling {
 
     private final Map<Long, Job> jobContentCache = new ConcurrentHashMap<>(16);
 
@@ -102,7 +102,7 @@ public class CollectorAndJobScheduler implements 
CollectorScheduling, CollectJob
             collector.setStatus(CommonConstants.COLLECTOR_STATUS_ONLINE);
             if (collectorInfo != null) {
                 collector.setIp(collectorInfo.getIp());
-                collector.setMode(collectorInfo.getMode());   
+                collector.setMode(collectorInfo.getMode());
             }
         } else {
             if (collectorInfo == null) {
@@ -336,34 +336,33 @@ public class CollectorAndJobScheduler implements 
CollectorScheduling, CollectJob
         }
         if (CommonConstants.MAIN_COLLECTOR_NODE.equals(node.getIdentity())) {
             return collectJobService.collectSyncJobData(job);
-        } else {
-            List<CollectRep.MetricsData> metricsData = new LinkedList<>();
-            ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
-                    .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK)
-                    .setDirection(ClusterMsg.Direction.REQUEST)
-                    .setMsg(JsonUtil.toJson(job))
-                    .build();
-            boolean result = this.manageServer.sendMsg(node.getIdentity(), 
message);
-            if (result) {
-                CountDownLatch countDownLatch = new CountDownLatch(1);
-                CollectResponseEventListener listener = new 
CollectResponseEventListener() {
-                    @Override
-                    public void response(List<CollectRep.MetricsData> 
responseMetrics) {
-                        if (responseMetrics != null) {
-                            metricsData.addAll(responseMetrics);
-                        }
-                        countDownLatch.countDown();
+        }
+        List<CollectRep.MetricsData> metricsData = new LinkedList<>();
+        ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
+                .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK)
+                .setDirection(ClusterMsg.Direction.REQUEST)
+                .setMsg(JsonUtil.toJson(job))
+                .build();
+        boolean result = this.manageServer.sendMsg(node.getIdentity(), 
message);
+        if (result) {
+            CountDownLatch countDownLatch = new CountDownLatch(1);
+            CollectResponseEventListener listener = new 
CollectResponseEventListener() {
+                @Override
+                public void response(List<CollectRep.MetricsData> 
responseMetrics) {
+                    if (responseMetrics != null) {
+                        metricsData.addAll(responseMetrics);
                     }
-                };
-                eventListeners.put(job.getMonitorId(), listener);
-                try {
-                    countDownLatch.await(120, TimeUnit.SECONDS);
-                } catch (Exception e) {
-                    log.info("The sync task runs for 120 seconds with no 
response and returns");
+                    countDownLatch.countDown();
                 }
+            };
+            eventListeners.put(job.getMonitorId(), listener);
+            try {
+                countDownLatch.await(120, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                log.info("The sync task runs for 120 seconds with no response 
and returns");
             }
-            return metricsData;
         }
+        return metricsData;
     }
 
     @Override
diff --git 
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java
 
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java
index 6acfefb98..29b1acc93 100644
--- 
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java
+++ 
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java
@@ -82,9 +82,9 @@ public class SchedulerInit implements CommandLineRunner {
         collectorDao.saveAll(collectors);
         // insert default consistent node
         CollectorInfo collectorInfo = CollectorInfo.builder()
-                                              
.name(CommonConstants.MAIN_COLLECTOR_NODE)
-                                              .ip(MAIN_COLLECTOR_NODE_IP)
-                                              .build();
+                .name(CommonConstants.MAIN_COLLECTOR_NODE)
+                .ip(MAIN_COLLECTOR_NODE_IP)
+                .build();
         
collectorScheduling.collectorGoOnline(CommonConstants.MAIN_COLLECTOR_NODE, 
collectorInfo);
         // init jobs
         List<Monitor> monitors = 
monitorDao.findMonitorsByStatusNotInAndAndJobIdNotNull(List.of((byte) 0));
@@ -109,7 +109,7 @@ public class SchedulerInit implements CommandLineRunner {
                                 param.getType())).collect(Collectors.toList());
                 List<ParamDefine> paramDefaultValue = 
appDefine.getParams().stream()
                         .filter(item -> 
StringUtils.hasText(item.getDefaultValue()))
-                        .collect(Collectors.toList());
+                        .toList();
                 paramDefaultValue.forEach(defaultVar -> {
                     if (configmaps.stream().noneMatch(item -> 
item.getKey().equals(defaultVar.getField()))) {
                         // todo type
diff --git 
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
 
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
index 80194a03b..afafa4169 100644
--- 
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
+++ 
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.common.entity.message.ClusterMsg;
 import org.apache.hertzbeat.common.support.CommonThreadPool;
-import org.apache.hertzbeat.manager.scheduler.CollectorAndJobScheduler;
+import org.apache.hertzbeat.manager.scheduler.CollectorJobScheduler;
 import org.apache.hertzbeat.manager.scheduler.SchedulerProperties;
 import 
org.apache.hertzbeat.manager.scheduler.netty.process.CollectCyclicDataResponseProcessor;
 import 
org.apache.hertzbeat.manager.scheduler.netty.process.CollectOneTimeDataResponseProcessor;
@@ -53,7 +53,7 @@ import org.springframework.stereotype.Component;
 @Slf4j
 public class ManageServer implements CommandLineRunner {
 
-    private final CollectorAndJobScheduler collectorAndJobScheduler;
+    private final CollectorJobScheduler collectorJobScheduler;
 
     private ScheduledExecutorService channelSchedule;
 
@@ -62,10 +62,10 @@ public class ManageServer implements CommandLineRunner {
     private final Map<String, Channel> clientChannelTable = new 
ConcurrentHashMap<>(16);
 
     public ManageServer(final SchedulerProperties schedulerProperties,
-                        final CollectorAndJobScheduler 
collectorAndJobScheduler,
+                        final CollectorJobScheduler collectorJobScheduler,
                         final CommonThreadPool threadPool) {
-        this.collectorAndJobScheduler = collectorAndJobScheduler;
-        this.collectorAndJobScheduler.setManageServer(this);
+        this.collectorJobScheduler = collectorJobScheduler;
+        this.collectorJobScheduler.setManageServer(this);
         this.init(schedulerProperties, threadPool);
     }
 
@@ -95,7 +95,7 @@ public class ManageServer implements CommandLineRunner {
                     if (!channel.isActive()) {
                         channel.closeFuture();
                         this.clientChannelTable.remove(collector);
-                        
this.collectorAndJobScheduler.collectorGoOffline(collector);
+                        
this.collectorJobScheduler.collectorGoOffline(collector);
                     }
                 });   
             } catch (Exception e) {
@@ -110,8 +110,8 @@ public class ManageServer implements CommandLineRunner {
         this.channelSchedule.shutdownNow();
     }
 
-    public CollectorAndJobScheduler getCollectorAndJobScheduler() {
-        return collectorAndJobScheduler;
+    public CollectorJobScheduler getCollectorAndJobScheduler() {
+        return collectorJobScheduler;
     }
 
     public Channel getChannel(final String identity) {
@@ -134,7 +134,7 @@ public class ManageServer implements CommandLineRunner {
     public void closeChannel(final String identity) {
         Channel channel = this.getChannel(identity);
         if (channel != null) {
-            this.collectorAndJobScheduler.collectorGoOffline(identity);
+            this.collectorJobScheduler.collectorGoOffline(identity);
             ClusterMsg.Message message = 
ClusterMsg.Message.newBuilder().setType(ClusterMsg.MessageType.GO_CLOSE).build();
             this.remotingServer.sendMsg(channel, message);
             this.clientChannelTable.remove(identity);
@@ -142,7 +142,7 @@ public class ManageServer implements CommandLineRunner {
         }
     }
 
-    public boolean isChannelExist(final String identity) {
+    public boolean isChannelActive(final String identity) {
         Channel channel = this.clientChannelTable.get(identity);
         return channel != null && channel.isActive();
     }
@@ -185,7 +185,7 @@ public class ManageServer implements CommandLineRunner {
             }
             if (identity != null) {
                 ManageServer.this.clientChannelTable.remove(identity);
-                
ManageServer.this.collectorAndJobScheduler.collectorGoOffline(identity);
+                
ManageServer.this.collectorJobScheduler.collectorGoOffline(identity);
                 channel.close();
                 log.info("handle idle event triggered. the client {} is going 
offline.", identity);
             }
diff --git 
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
 
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
index c5a12b597..8cf705027 100644
--- 
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
+++ 
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
@@ -38,9 +38,16 @@ public class HeartbeatProcessor implements 
NettyRemotingProcessor {
     @Override
     public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
         String identity = message.getIdentity();
-        boolean isChannelExist = this.manageServer.isChannelExist(identity);
-        if (!isChannelExist) {
-            log.info("the collector {} is not online.", identity);
+        boolean isChannelActive = this.manageServer.isChannelActive(identity);
+        if (!isChannelActive) {
+            this.manageServer.addChannel(identity, ctx.channel());
+            isChannelActive = this.manageServer.isChannelActive(identity);
+            if (!isChannelActive) {
+                log.info("the collector {} is not online.", identity);
+                return null;
+            } else {
+                
this.manageServer.getCollectorAndJobScheduler().collectorGoOnline(identity, 
null);
+            }
         }
         if (log.isDebugEnabled()) {
             log.debug("server receive collector {} heartbeat", 
message.getIdentity());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to