This is an automated email from the ASF dual-hosted git repository.
gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new 69802634d [bugfix] fix when manager restart, collect register error
(#1896)
69802634d is described below
commit 69802634d4d71505458bdaebb89bb550de495e5f
Author: Ceilzcx <[email protected]>
AuthorDate: Wed May 1 20:38:28 2024 +0800
[bugfix] fix when manager restart, collect register error (#1896)
Co-authored-by: tomsun28 <[email protected]>
---
...obScheduler.java => CollectorJobScheduler.java} | 51 +++++++++++-----------
.../hertzbeat/manager/scheduler/SchedulerInit.java | 8 ++--
.../manager/scheduler/netty/ManageServer.java | 22 +++++-----
.../netty/process/HeartbeatProcessor.java | 13 ++++--
4 files changed, 50 insertions(+), 44 deletions(-)
diff --git
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorAndJobScheduler.java
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
similarity index 93%
rename from
manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorAndJobScheduler.java
rename to
manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
index 5acc10ada..c2c63dee0 100644
---
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorAndJobScheduler.java
+++
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
@@ -61,7 +61,7 @@ import org.springframework.util.StringUtils;
@Component
@AutoConfigureAfter(value = {SchedulerProperties.class})
@Slf4j
-public class CollectorAndJobScheduler implements CollectorScheduling,
CollectJobScheduling {
+public class CollectorJobScheduler implements CollectorScheduling,
CollectJobScheduling {
private final Map<Long, Job> jobContentCache = new ConcurrentHashMap<>(16);
@@ -102,7 +102,7 @@ public class CollectorAndJobScheduler implements
CollectorScheduling, CollectJob
collector.setStatus(CommonConstants.COLLECTOR_STATUS_ONLINE);
if (collectorInfo != null) {
collector.setIp(collectorInfo.getIp());
- collector.setMode(collectorInfo.getMode());
+ collector.setMode(collectorInfo.getMode());
}
} else {
if (collectorInfo == null) {
@@ -336,34 +336,33 @@ public class CollectorAndJobScheduler implements
CollectorScheduling, CollectJob
}
if (CommonConstants.MAIN_COLLECTOR_NODE.equals(node.getIdentity())) {
return collectJobService.collectSyncJobData(job);
- } else {
- List<CollectRep.MetricsData> metricsData = new LinkedList<>();
- ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
- .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK)
- .setDirection(ClusterMsg.Direction.REQUEST)
- .setMsg(JsonUtil.toJson(job))
- .build();
- boolean result = this.manageServer.sendMsg(node.getIdentity(),
message);
- if (result) {
- CountDownLatch countDownLatch = new CountDownLatch(1);
- CollectResponseEventListener listener = new
CollectResponseEventListener() {
- @Override
- public void response(List<CollectRep.MetricsData>
responseMetrics) {
- if (responseMetrics != null) {
- metricsData.addAll(responseMetrics);
- }
- countDownLatch.countDown();
+ }
+ List<CollectRep.MetricsData> metricsData = new LinkedList<>();
+ ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
+ .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK)
+ .setDirection(ClusterMsg.Direction.REQUEST)
+ .setMsg(JsonUtil.toJson(job))
+ .build();
+ boolean result = this.manageServer.sendMsg(node.getIdentity(),
message);
+ if (result) {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ CollectResponseEventListener listener = new
CollectResponseEventListener() {
+ @Override
+ public void response(List<CollectRep.MetricsData>
responseMetrics) {
+ if (responseMetrics != null) {
+ metricsData.addAll(responseMetrics);
}
- };
- eventListeners.put(job.getMonitorId(), listener);
- try {
- countDownLatch.await(120, TimeUnit.SECONDS);
- } catch (Exception e) {
- log.info("The sync task runs for 120 seconds with no
response and returns");
+ countDownLatch.countDown();
}
+ };
+ eventListeners.put(job.getMonitorId(), listener);
+ try {
+ countDownLatch.await(120, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.info("The sync task runs for 120 seconds with no response
and returns");
}
- return metricsData;
}
+ return metricsData;
}
@Override
diff --git
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java
index 6acfefb98..29b1acc93 100644
---
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java
+++
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java
@@ -82,9 +82,9 @@ public class SchedulerInit implements CommandLineRunner {
collectorDao.saveAll(collectors);
// insert default consistent node
CollectorInfo collectorInfo = CollectorInfo.builder()
-
.name(CommonConstants.MAIN_COLLECTOR_NODE)
- .ip(MAIN_COLLECTOR_NODE_IP)
- .build();
+ .name(CommonConstants.MAIN_COLLECTOR_NODE)
+ .ip(MAIN_COLLECTOR_NODE_IP)
+ .build();
collectorScheduling.collectorGoOnline(CommonConstants.MAIN_COLLECTOR_NODE,
collectorInfo);
// init jobs
List<Monitor> monitors =
monitorDao.findMonitorsByStatusNotInAndAndJobIdNotNull(List.of((byte) 0));
@@ -109,7 +109,7 @@ public class SchedulerInit implements CommandLineRunner {
param.getType())).collect(Collectors.toList());
List<ParamDefine> paramDefaultValue =
appDefine.getParams().stream()
.filter(item ->
StringUtils.hasText(item.getDefaultValue()))
- .collect(Collectors.toList());
+ .toList();
paramDefaultValue.forEach(defaultVar -> {
if (configmaps.stream().noneMatch(item ->
item.getKey().equals(defaultVar.getField()))) {
// todo type
diff --git
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
index 80194a03b..afafa4169 100644
---
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
+++
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.entity.message.ClusterMsg;
import org.apache.hertzbeat.common.support.CommonThreadPool;
-import org.apache.hertzbeat.manager.scheduler.CollectorAndJobScheduler;
+import org.apache.hertzbeat.manager.scheduler.CollectorJobScheduler;
import org.apache.hertzbeat.manager.scheduler.SchedulerProperties;
import
org.apache.hertzbeat.manager.scheduler.netty.process.CollectCyclicDataResponseProcessor;
import
org.apache.hertzbeat.manager.scheduler.netty.process.CollectOneTimeDataResponseProcessor;
@@ -53,7 +53,7 @@ import org.springframework.stereotype.Component;
@Slf4j
public class ManageServer implements CommandLineRunner {
- private final CollectorAndJobScheduler collectorAndJobScheduler;
+ private final CollectorJobScheduler collectorJobScheduler;
private ScheduledExecutorService channelSchedule;
@@ -62,10 +62,10 @@ public class ManageServer implements CommandLineRunner {
private final Map<String, Channel> clientChannelTable = new
ConcurrentHashMap<>(16);
public ManageServer(final SchedulerProperties schedulerProperties,
- final CollectorAndJobScheduler
collectorAndJobScheduler,
+ final CollectorJobScheduler collectorJobScheduler,
final CommonThreadPool threadPool) {
- this.collectorAndJobScheduler = collectorAndJobScheduler;
- this.collectorAndJobScheduler.setManageServer(this);
+ this.collectorJobScheduler = collectorJobScheduler;
+ this.collectorJobScheduler.setManageServer(this);
this.init(schedulerProperties, threadPool);
}
@@ -95,7 +95,7 @@ public class ManageServer implements CommandLineRunner {
if (!channel.isActive()) {
channel.closeFuture();
this.clientChannelTable.remove(collector);
-
this.collectorAndJobScheduler.collectorGoOffline(collector);
+
this.collectorJobScheduler.collectorGoOffline(collector);
}
});
} catch (Exception e) {
@@ -110,8 +110,8 @@ public class ManageServer implements CommandLineRunner {
this.channelSchedule.shutdownNow();
}
- public CollectorAndJobScheduler getCollectorAndJobScheduler() {
- return collectorAndJobScheduler;
+ public CollectorJobScheduler getCollectorAndJobScheduler() {
+ return collectorJobScheduler;
}
public Channel getChannel(final String identity) {
@@ -134,7 +134,7 @@ public class ManageServer implements CommandLineRunner {
public void closeChannel(final String identity) {
Channel channel = this.getChannel(identity);
if (channel != null) {
- this.collectorAndJobScheduler.collectorGoOffline(identity);
+ this.collectorJobScheduler.collectorGoOffline(identity);
ClusterMsg.Message message =
ClusterMsg.Message.newBuilder().setType(ClusterMsg.MessageType.GO_CLOSE).build();
this.remotingServer.sendMsg(channel, message);
this.clientChannelTable.remove(identity);
@@ -142,7 +142,7 @@ public class ManageServer implements CommandLineRunner {
}
}
- public boolean isChannelExist(final String identity) {
+ public boolean isChannelActive(final String identity) {
Channel channel = this.clientChannelTable.get(identity);
return channel != null && channel.isActive();
}
@@ -185,7 +185,7 @@ public class ManageServer implements CommandLineRunner {
}
if (identity != null) {
ManageServer.this.clientChannelTable.remove(identity);
-
ManageServer.this.collectorAndJobScheduler.collectorGoOffline(identity);
+
ManageServer.this.collectorJobScheduler.collectorGoOffline(identity);
channel.close();
log.info("handle idle event triggered. the client {} is going
offline.", identity);
}
diff --git
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
index c5a12b597..8cf705027 100644
---
a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
+++
b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
@@ -38,9 +38,16 @@ public class HeartbeatProcessor implements
NettyRemotingProcessor {
@Override
public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
String identity = message.getIdentity();
- boolean isChannelExist = this.manageServer.isChannelExist(identity);
- if (!isChannelExist) {
- log.info("the collector {} is not online.", identity);
+ boolean isChannelActive = this.manageServer.isChannelActive(identity);
+ if (!isChannelActive) {
+ this.manageServer.addChannel(identity, ctx.channel());
+ isChannelActive = this.manageServer.isChannelActive(identity);
+ if (!isChannelActive) {
+ log.info("the collector {} is not online.", identity);
+ return null;
+ } else {
+
this.manageServer.getCollectorAndJobScheduler().collectorGoOnline(identity,
null);
+ }
}
if (log.isDebugEnabled()) {
log.debug("server receive collector {} heartbeat",
message.getIdentity());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]