This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch dev-2.0.0
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/dev-2.0.0 by this push:
new b9f1d65cb5 [refactor] Apache Fory (#3873)
b9f1d65cb5 is described below
commit b9f1d65cb58a6ef0e6832741eb27557e70fc3dfc
Author: Logic <[email protected]>
AuthorDate: Thu Nov 27 17:28:44 2025 +0800
[refactor] Apache Fory (#3873)
Signed-off-by: Logic <[email protected]>
Co-authored-by: Duansg <[email protected]>
---
.../dispatch/entrance/CollectServerTest.java | 6 +-
.../entrance/internal/CollectJobServiceTest.java | 6 +-
.../collector/dispatch/entrance/CollectServer.java | 43 ++++----
.../entrance/internal/CollectJobService.java | 35 +++----
.../processor/CollectCyclicDataProcessor.java | 6 +-
.../processor/CollectOneTimeDataProcessor.java | 6 +-
.../processor/DeleteCyclicTaskProcessor.java | 6 +-
.../entrance/processor/GoCloseProcessor.java | 8 +-
.../entrance/processor/GoOfflineProcessor.java | 19 ++--
.../entrance/processor/GoOnlineProcessor.java | 28 +++--
.../entrance/processor/HeartbeatProcessor.java | 4 +-
hertzbeat-common/pom.xml | 11 ++
.../common/entity/message/ClusterMessage.java | 114 +++++++++++++++++++++
.../apache/hertzbeat/common/util/ArrowUtil.java | 13 ++-
.../org/apache/hertzbeat/common/util/JsonUtil.java | 42 +++++++-
.../manager/scheduler/CollectorJobScheduler.java | 87 ++++++++--------
.../manager/scheduler/netty/ManageServer.java | 20 ++--
.../CollectCyclicDataResponseProcessor.java | 10 +-
...yclicServiceDiscoveryDataResponseProcessor.java | 6 +-
.../CollectOneTimeDataResponseProcessor.java | 6 +-
.../netty/process/CollectorOfflineProcessor.java | 4 +-
.../netty/process/CollectorOnlineProcessor.java | 17 ++-
.../netty/process/HeartbeatProcessor.java | 10 +-
.../scheduler/CollectorJobSchedulerTest.java | 8 +-
.../apache/hertzbeat/remoting/RemotingClient.java | 8 +-
.../apache/hertzbeat/remoting/RemotingServer.java | 8 +-
.../apache/hertzbeat/remoting/netty/NettyHook.java | 4 +-
.../remoting/netty/NettyRemotingAbstract.java | 35 +++----
.../remoting/netty/NettyRemotingClient.java | 44 ++++----
.../remoting/netty/NettyRemotingProcessor.java | 4 +-
.../remoting/netty/NettyRemotingServer.java | 38 +++----
.../hertzbeat/remoting/netty/ResponseFuture.java | 8 +-
.../hertzbeat/remoting/netty/codec/ForyCodec.java | 64 ++++++++++++
.../hertzbeat/remoting/RemotingServiceTest.java | 60 +++++------
material/licenses/backend/LICENSE | 1 +
pom.xml | 1 +
36 files changed, 501 insertions(+), 289 deletions(-)
diff --git
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServerTest.java
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServerTest.java
index 3112e1e5ba..a4123c80c7 100644
---
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServerTest.java
+++
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServerTest.java
@@ -29,7 +29,7 @@ import
org.apache.hertzbeat.collector.dispatch.CollectorInfoProperties;
import org.apache.hertzbeat.collector.dispatch.DispatchProperties;
import
org.apache.hertzbeat.collector.dispatch.entrance.internal.CollectJobService;
import org.apache.hertzbeat.collector.timer.TimerDispatch;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.support.CommonThreadPool;
import org.apache.hertzbeat.remoting.RemotingClient;
import org.junit.jupiter.api.BeforeEach;
@@ -113,7 +113,7 @@ class CollectServerTest {
RemotingClient remotingClient = mock(RemotingClient.class);
ReflectionTestUtils.setField(collectServer, "remotingClient",
remotingClient);
- ClusterMsg.Message message = mock(ClusterMsg.Message.class);
+ ClusterMessage message = mock(ClusterMessage.class);
collectServer.sendMsg(message);
@@ -135,7 +135,7 @@ class CollectServerTest {
collectNettyEventListener.onChannelActive(channel);
verify(timerDispatch, times(1)).goOnline();
- verify(remotingClient,
times(1)).sendMsg(any(ClusterMsg.Message.class));
+ verify(remotingClient, times(1)).sendMsg(any(ClusterMessage.class));
ScheduledExecutorService scheduledExecutor =
(ScheduledExecutorService)
ReflectionTestUtils.getField(collectServer, "scheduledExecutor");
diff --git
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobServiceTest.java
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobServiceTest.java
index 574103b0a8..125eded7dd 100644
---
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobServiceTest.java
+++
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobServiceTest.java
@@ -34,7 +34,7 @@ import org.apache.hertzbeat.collector.dispatch.WorkerPool;
import org.apache.hertzbeat.collector.dispatch.entrance.CollectServer;
import org.apache.hertzbeat.collector.timer.TimerDispatch;
import org.apache.hertzbeat.common.entity.job.Job;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -116,7 +116,7 @@ class CollectJobServiceTest {
collectJobService.collectSyncOneTimeJobData(job);
- verify(collectServer, times(1)).sendMsg(any(ClusterMsg.Message.class));
+ verify(collectServer, times(1)).sendMsg(any(ClusterMessage.class));
}
@Test
@@ -137,7 +137,7 @@ class CollectJobServiceTest {
.build();
collectJobService.sendAsyncCollectData(metricsData);
- verify(collectServer, times(1)).sendMsg(any(ClusterMsg.Message.class));
+ verify(collectServer, times(1)).sendMsg(any(ClusterMessage.class));
}
@Test
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServer.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServer.java
index 9b34a11d4b..6fd62e04ec 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServer.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServer.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,7 +18,6 @@
package org.apache.hertzbeat.collector.dispatch.entrance;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.ByteString;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.dispatch.CollectorInfoProperties;
@@ -33,7 +32,7 @@ import
org.apache.hertzbeat.collector.dispatch.entrance.processor.GoOnlineProces
import
org.apache.hertzbeat.collector.dispatch.entrance.processor.HeartbeatProcessor;
import org.apache.hertzbeat.collector.timer.TimerDispatch;
import org.apache.hertzbeat.common.entity.dto.CollectorInfo;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.support.CommonThreadPool;
import org.apache.hertzbeat.common.util.JsonUtil;
import org.apache.hertzbeat.remoting.RemotingClient;
@@ -65,7 +64,7 @@ public class CollectServer implements CommandLineRunner {
private final TimerDispatch timerDispatch;
private final CollectorInfoProperties infoProperties;
-
+
private RemotingClient remotingClient;
private ScheduledExecutorService scheduledExecutor;
@@ -97,13 +96,13 @@ public class CollectServer implements CommandLineRunner {
nettyClientConfig.setServerPort(nettyProperties.getManagerPort());
this.remotingClient = new NettyRemotingClient(nettyClientConfig, new
CollectNettyEventListener(), threadPool);
-
this.remotingClient.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, new
HeartbeatProcessor());
-
this.remotingClient.registerProcessor(ClusterMsg.MessageType.ISSUE_CYCLIC_TASK,
new CollectCyclicDataProcessor(this));
-
this.remotingClient.registerProcessor(ClusterMsg.MessageType.DELETE_CYCLIC_TASK,
new DeleteCyclicTaskProcessor(this));
-
this.remotingClient.registerProcessor(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK,
new CollectOneTimeDataProcessor(this));
-
this.remotingClient.registerProcessor(ClusterMsg.MessageType.GO_OFFLINE, new
GoOfflineProcessor());
-
this.remotingClient.registerProcessor(ClusterMsg.MessageType.GO_ONLINE, new
GoOnlineProcessor());
- this.remotingClient.registerProcessor(ClusterMsg.MessageType.GO_CLOSE,
new GoCloseProcessor(this));
+
this.remotingClient.registerProcessor(ClusterMessage.MessageType.HEARTBEAT, new
HeartbeatProcessor());
+
this.remotingClient.registerProcessor(ClusterMessage.MessageType.ISSUE_CYCLIC_TASK,
new CollectCyclicDataProcessor(this));
+
this.remotingClient.registerProcessor(ClusterMessage.MessageType.DELETE_CYCLIC_TASK,
new DeleteCyclicTaskProcessor(this));
+
this.remotingClient.registerProcessor(ClusterMessage.MessageType.ISSUE_ONE_TIME_TASK,
new CollectOneTimeDataProcessor(this));
+
this.remotingClient.registerProcessor(ClusterMessage.MessageType.GO_OFFLINE,
new GoOfflineProcessor());
+
this.remotingClient.registerProcessor(ClusterMessage.MessageType.GO_ONLINE, new
GoOnlineProcessor());
+
this.remotingClient.registerProcessor(ClusterMessage.MessageType.GO_CLOSE, new
GoCloseProcessor(this));
}
public void shutdown() {
@@ -116,7 +115,7 @@ public class CollectServer implements CommandLineRunner {
return collectJobService;
}
- public void sendMsg(final ClusterMsg.Message message) {
+ public void sendMsg(final ClusterMessage message) {
this.remotingClient.sendMsg(message);
}
@@ -143,10 +142,10 @@ public class CollectServer implements CommandLineRunner {
.build();
timerDispatch.goOnline();
// send online message
- ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
- .setIdentity(identity)
- .setType(ClusterMsg.MessageType.GO_ONLINE)
-
.setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(collectorInfo)))
+ ClusterMessage message = ClusterMessage.builder()
+ .identity(identity)
+ .type(ClusterMessage.MessageType.GO_ONLINE)
+ .msg(JsonUtil.toJsonBytes(collectorInfo))
.build();
CollectServer.this.sendMsg(message);
@@ -163,13 +162,13 @@ public class CollectServer implements CommandLineRunner {
// schedule send heartbeat message
scheduledExecutor.scheduleAtFixedRate(() -> {
try {
- ClusterMsg.Message heartbeat =
ClusterMsg.Message.newBuilder()
- .setIdentity(identity)
- .setDirection(ClusterMsg.Direction.REQUEST)
- .setType(ClusterMsg.MessageType.HEARTBEAT)
+ ClusterMessage heartbeat = ClusterMessage.builder()
+ .identity(identity)
+ .direction(ClusterMessage.Direction.REQUEST)
+ .type(ClusterMessage.MessageType.HEARTBEAT)
.build();
CollectServer.this.sendMsg(heartbeat);
- log.info("collector send cluster server heartbeat,
time: {}.", System.currentTimeMillis());
+ log.info("collector send cluster server heartbeat,
time: {}.", System.currentTimeMillis());
} catch (Exception e) {
log.error("schedule send heartbeat to server
error.{}", e.getMessage());
}
@@ -182,4 +181,4 @@ public class CollectServer implements CommandLineRunner {
log.info("handle idle event triggered. collector is going
offline.");
}
}
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobService.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobService.java
index 616abdbfe5..247a06a3ee 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobService.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobService.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,7 +17,6 @@
package org.apache.hertzbeat.collector.dispatch.entrance.internal;
-import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.dispatch.DispatchProperties;
import org.apache.hertzbeat.collector.dispatch.WorkerPool;
@@ -25,7 +24,7 @@ import
org.apache.hertzbeat.collector.dispatch.entrance.CollectServer;
import org.apache.hertzbeat.collector.timer.TimerDispatch;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.job.Job;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.util.ArrowUtil;
import org.apache.hertzbeat.common.util.IpDomainUtil;
@@ -117,10 +116,10 @@ public class CollectJobService {
workerPool.executeJob(() -> {
List<CollectRep.MetricsData> metricsDataList =
this.collectSyncJobData(oneTimeJob);
byte[] msg = ArrowUtil.serializeMetricsData(metricsDataList);
- ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
- .setMsg(ByteString.copyFrom(msg))
- .setDirection(ClusterMsg.Direction.REQUEST)
-
.setType(ClusterMsg.MessageType.RESPONSE_ONE_TIME_TASK_DATA)
+ ClusterMessage message = ClusterMessage.builder()
+ .msg(msg)
+ .direction(ClusterMessage.Direction.REQUEST)
+
.type(ClusterMessage.MessageType.RESPONSE_ONE_TIME_TASK_DATA)
.build();
this.collectServer.sendMsg(message);
});
@@ -153,22 +152,22 @@ public class CollectJobService {
*/
public void sendAsyncCollectData(CollectRep.MetricsData metricsData) {
byte[] msg = ArrowUtil.serializeMetricsData(List.of(metricsData));
- ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
- .setIdentity(collectorIdentity)
- .setMsg(ByteString.copyFrom(msg))
- .setDirection(ClusterMsg.Direction.REQUEST)
- .setType(ClusterMsg.MessageType.RESPONSE_CYCLIC_TASK_DATA)
+ ClusterMessage message = ClusterMessage.builder()
+ .identity(collectorIdentity)
+ .msg(msg)
+ .direction(ClusterMessage.Direction.REQUEST)
+ .type(ClusterMessage.MessageType.RESPONSE_CYCLIC_TASK_DATA)
.build();
this.collectServer.sendMsg(message);
}
public void sendAsyncServiceDiscoveryData(CollectRep.MetricsData
metricsData) {
byte[] msg = ArrowUtil.serializeMetricsData(List.of(metricsData));
- ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
- .setIdentity(collectorIdentity)
- .setMsg(ByteString.copyFrom(msg))
- .setDirection(ClusterMsg.Direction.REQUEST)
- .setType(ClusterMsg.MessageType.RESPONSE_CYCLIC_TASK_SD_DATA)
+ ClusterMessage message = ClusterMessage.builder()
+ .identity(collectorIdentity)
+ .msg(msg)
+ .direction(ClusterMessage.Direction.REQUEST)
+ .type(ClusterMessage.MessageType.RESPONSE_CYCLIC_TASK_SD_DATA)
.build();
this.collectServer.sendMsg(message);
}
@@ -184,4 +183,4 @@ public class CollectJobService {
public void setCollectServer(CollectServer collectServer) {
this.collectServer = collectServer;
}
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectCyclicDataProcessor.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectCyclicDataProcessor.java
index f37f71d850..cd01bcdf38 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectCyclicDataProcessor.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectCyclicDataProcessor.java
@@ -21,7 +21,7 @@ import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.dispatch.entrance.CollectServer;
import org.apache.hertzbeat.common.entity.job.Job;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.util.JsonUtil;
import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
@@ -37,8 +37,8 @@ public class CollectCyclicDataProcessor implements
NettyRemotingProcessor {
}
@Override
- public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
- Job job = JsonUtil.fromJson(message.getMsg().toStringUtf8(),
Job.class);
+ public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage
message) {
+ Job job = JsonUtil.fromJson(message.getMsg(), Job.class);
if (job == null) {
log.error("collector receive cyclic task job is null");
return null;
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectOneTimeDataProcessor.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectOneTimeDataProcessor.java
index 74f3d526cd..49cba7e611 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectOneTimeDataProcessor.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectOneTimeDataProcessor.java
@@ -20,7 +20,7 @@ package
org.apache.hertzbeat.collector.dispatch.entrance.processor;
import io.netty.channel.ChannelHandlerContext;
import org.apache.hertzbeat.collector.dispatch.entrance.CollectServer;
import org.apache.hertzbeat.common.entity.job.Job;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.util.JsonUtil;
import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
@@ -35,8 +35,8 @@ public class CollectOneTimeDataProcessor implements
NettyRemotingProcessor {
}
@Override
- public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
- Job oneTimeJob = JsonUtil.fromJson(message.getMsg().toStringUtf8(),
Job.class);
+ public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage
message) {
+ Job oneTimeJob = JsonUtil.fromJson(message.getMsg(), Job.class);
collectServer.getCollectJobService().collectSyncOneTimeJobData(oneTimeJob);
return null;
}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/DeleteCyclicTaskProcessor.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/DeleteCyclicTaskProcessor.java
index 0499a8b2e0..b715dd6240 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/DeleteCyclicTaskProcessor.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/DeleteCyclicTaskProcessor.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.dispatch.entrance.CollectServer;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.util.JsonUtil;
import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
@@ -39,9 +39,9 @@ public class DeleteCyclicTaskProcessor implements
NettyRemotingProcessor {
}
@Override
- public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
+ public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage
message) {
TypeReference<List<Long>> typeReference = new TypeReference<>() {};
- List<Long> jobIds = JsonUtil.fromJson(message.getMsg().toStringUtf8(),
typeReference);
+ List<Long> jobIds = JsonUtil.fromJson(message.getMsgString(),
typeReference);
if (jobIds == null || jobIds.isEmpty()) {
log.error("collector receive delete cyclic task job ids is null");
return null;
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoCloseProcessor.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoCloseProcessor.java
index 4b068f1e45..9b2674ec31 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoCloseProcessor.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoCloseProcessor.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.dispatch.entrance.CollectServer;
import org.apache.hertzbeat.collector.timer.TimerDispatch;
import org.apache.hertzbeat.common.constants.CommonConstants;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.support.SpringContextHolder;
import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
import org.springframework.boot.SpringApplication;
@@ -41,12 +41,12 @@ public class GoCloseProcessor implements
NettyRemotingProcessor {
}
@Override
- public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
+ public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage
message) {
if (this.timerDispatch == null) {
this.timerDispatch =
SpringContextHolder.getBean(TimerDispatch.class);
}
- if
(message.getMsg().toStringUtf8().contains(CommonConstants.COLLECTOR_AUTH_FAILED))
{
- log.error("[Auth Failed]receive client auth failed message and go
close. {}", message.getMsg());
+ if
(message.getMsgString().contains(CommonConstants.COLLECTOR_AUTH_FAILED)) {
+ log.error("[Auth Failed]receive client auth failed message and go
close. {}", message.getMsgString());
}
this.timerDispatch.goOffline();
this.collectServer.shutdown();
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOfflineProcessor.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOfflineProcessor.java
index aad85194ee..c3200b4b29 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOfflineProcessor.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOfflineProcessor.java
@@ -17,12 +17,12 @@
package org.apache.hertzbeat.collector.dispatch.entrance.processor;
-import com.google.protobuf.ByteString;
import io.netty.channel.ChannelHandlerContext;
+import java.nio.charset.StandardCharsets;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.timer.TimerDispatch;
import org.apache.hertzbeat.common.constants.CommonConstants;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.support.SpringContextHolder;
import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
@@ -36,20 +36,21 @@ public class GoOfflineProcessor implements
NettyRemotingProcessor {
private TimerDispatch timerDispatch;
@Override
- public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
+ public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage
message) {
if (this.timerDispatch == null) {
this.timerDispatch =
SpringContextHolder.getBean(TimerDispatch.class);
}
timerDispatch.goOffline();
log.info("receive offline message and handle success");
- if
(message.getMsg().toStringUtf8().contains(CommonConstants.COLLECTOR_AUTH_FAILED))
{
- log.error("[Auth Failed]receive client auth failed message and go
offline. {}", message.getMsg());
+ if
(message.getMsgString().contains(CommonConstants.COLLECTOR_AUTH_FAILED)) {
+ log.error("[Auth Failed]receive client auth failed message and go
offline. {}", message.getMsgString());
return null;
}
- return ClusterMsg.Message.newBuilder()
- .setIdentity(message.getIdentity())
- .setDirection(ClusterMsg.Direction.RESPONSE)
-
.setMsg(ByteString.copyFromUtf8(String.valueOf(CommonConstants.SUCCESS_CODE)))
+ return ClusterMessage.builder()
+ .identity(message.getIdentity())
+ .direction(ClusterMessage.Direction.RESPONSE)
+ .type(ClusterMessage.MessageType.GO_OFFLINE)
+
.msg(String.valueOf(CommonConstants.SUCCESS_CODE).getBytes(StandardCharsets.UTF_8))
.build();
}
}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessor.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessor.java
index 34dda15197..e7a2fd3344 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessor.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessor.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,13 +17,11 @@
package org.apache.hertzbeat.collector.dispatch.entrance.processor;
-import com.google.protobuf.ByteString;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.timer.TimerDispatch;
-import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.dto.ServerInfo;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.support.SpringContextHolder;
import org.apache.hertzbeat.common.util.AesUtil;
import org.apache.hertzbeat.common.util.JsonUtil;
@@ -35,18 +33,20 @@ import
org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
*/
@Slf4j
public class GoOnlineProcessor implements NettyRemotingProcessor {
-
+
private TimerDispatch timerDispatch;
-
+
@Override
- public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
+ public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage
message) {
if (this.timerDispatch == null) {
this.timerDispatch =
SpringContextHolder.getBean(TimerDispatch.class);
}
- if (message.getMsg().isEmpty()) {
+ String msgString = message.getMsgString();
+ if (message.getMsg() == null || msgString == null ||
msgString.isEmpty()) {
log.warn("The message that server response to collector is empty,
please upgrade server");
} else {
- ServerInfo serverInfo =
JsonUtil.fromJson(message.getMsg().toStringUtf8(), ServerInfo.class);
+ // Use the new JsonUtil.fromJson(byte[], Class) method
+ ServerInfo serverInfo = JsonUtil.fromJson(message.getMsg(),
ServerInfo.class);
if (serverInfo == null || serverInfo.getAesSecret() == null) {
log.warn("The message that server response to collector has
not secret empty, please check");
} else {
@@ -55,10 +55,8 @@ public class GoOnlineProcessor implements
NettyRemotingProcessor {
}
timerDispatch.goOnline();
log.info("receive online message and handle success");
- return ClusterMsg.Message.newBuilder()
- .setIdentity(message.getIdentity())
- .setDirection(ClusterMsg.Direction.RESPONSE)
-
.setMsg(ByteString.copyFromUtf8(String.valueOf(CommonConstants.SUCCESS_CODE)))
- .build();
+ // Return null to stop the ping-pong loop.
+ // The collector should not reply to the server's confirmation
response.
+ return null;
}
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/HeartbeatProcessor.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/HeartbeatProcessor.java
index 9fc036cf01..eece81bdd1 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/HeartbeatProcessor.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/HeartbeatProcessor.java
@@ -19,7 +19,7 @@ package
org.apache.hertzbeat.collector.dispatch.entrance.processor;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
/**
@@ -28,7 +28,7 @@ import
org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
@Slf4j
public class HeartbeatProcessor implements NettyRemotingProcessor {
@Override
- public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
+ public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage
message) {
log.info("collector receive manager server response heartbeat, time:
{}. ", System.currentTimeMillis());
return null;
}
diff --git a/hertzbeat-common/pom.xml b/hertzbeat-common/pom.xml
index 8b31253382..c273ca7430 100644
--- a/hertzbeat-common/pom.xml
+++ b/hertzbeat-common/pom.xml
@@ -171,6 +171,17 @@
<version>${javaparser.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.fory</groupId>
+ <artifactId>fory-core</artifactId>
+ <version>${fory.version}</version>
+ </dependency>
+ <!-- row/arrow format support -->
+ <dependency>
+ <groupId>org.apache.fory</groupId>
+ <artifactId>fory-format</artifactId>
+ <version>${fory.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/message/ClusterMessage.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/message/ClusterMessage.java
new file mode 100644
index 0000000000..7db5a69448
--- /dev/null
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/message/ClusterMessage.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.entity.message;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.nio.charset.StandardCharsets;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * cluster message entity for fury serialization
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class ClusterMessage implements Serializable {
+
+ /**
+ * collector identity
+ */
+ @Builder.Default
+ private String identity = "";
+
+ /**
+ * message direction
+ */
+ @Builder.Default
+ private Direction direction = Direction.REQUEST;
+
+ /**
+ * message type
+ */
+ private MessageType type;
+
+ /**
+ * message content
+ * Use byte[] to ensure data integrity for both JSON strings (UTF-8) and
Binary data (Arrow).
+ * Avoids String encoding/decoding issues (Mojibake) across different JVMs
or Languages.
+ */
+ private byte[] msg;
+
+ @JsonIgnore
+ public String getMsgString() {
+ if (this.msg == null) {
+ return null;
+ }
+ return new String(this.msg, StandardCharsets.UTF_8);
+ }
+
+ @JsonIgnore
+ public void setMsgString(String jsonString) {
+ if (jsonString != null) {
+ this.msg = jsonString.getBytes(StandardCharsets.UTF_8);
+ } else {
+ this.msg = null;
+ }
+ }
+
+ /**
+ * Message Type Enum
+ */
+ public enum MessageType {
+ // heartbeat message
+ HEARTBEAT,
+ // collector go online to master message
+ GO_ONLINE,
+ // collector go offline to master message
+ GO_OFFLINE,
+ // collector go close to master
+ GO_CLOSE,
+ // issue cyclic collect task
+ ISSUE_CYCLIC_TASK,
+ // delete cyclic collect task
+ DELETE_CYCLIC_TASK,
+ // issue one-time collect task
+ ISSUE_ONE_TIME_TASK,
+ // response one-time collect data
+ RESPONSE_ONE_TIME_TASK_DATA,
+ // response cyclic collect data
+ RESPONSE_CYCLIC_TASK_DATA,
+ // response cyclic service discovery data
+ RESPONSE_CYCLIC_TASK_SD_DATA
+ }
+
+ /**
+ * Direction Enum
+ */
+ public enum Direction {
+ // request message
+ REQUEST,
+ // request response
+ RESPONSE
+ }
+}
\ No newline at end of file
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
index f2a04b6af9..82188a6b08 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -37,7 +37,7 @@ import org.apache.hertzbeat.common.entity.message.CollectRep;
*/
@Slf4j
public final class ArrowUtil {
-
+
private ArrowUtil() {
}
@@ -53,7 +53,7 @@ public final class ArrowUtil {
public static byte[] serializeMultipleRoots(List<VectorSchemaRoot> roots) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out)) {
-
+
dataOut.writeInt(roots.size());
for (VectorSchemaRoot root : roots) {
ArrowStreamWriter writer = new ArrowStreamWriter(
@@ -87,10 +87,10 @@ public final class ArrowUtil {
List<VectorSchemaRoot> roots = new ArrayList<>();
try (ByteArrayInputStream in = new ByteArrayInputStream(data);
DataInputStream dataIn = new DataInputStream(in)) {
-
+
int rootCount = dataIn.readInt();
RootAllocator allocator = new RootAllocator();
-
+
for (int i = 0; i < rootCount; i++) {
ArrowStreamReader reader = new ArrowStreamReader(
Channels.newChannel(in),
@@ -153,5 +153,4 @@ public final class ArrowUtil {
}
return serializeMultipleRoots(roots);
}
-
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/JsonUtil.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/JsonUtil.java
index b7a46e3a03..ca3ce0846f 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/JsonUtil.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/JsonUtil.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -62,6 +62,23 @@ public final class JsonUtil {
}
}
+ /**
+ * Object to byte array
+ * @param source object
+ * @return byte array
+ */
+ public static byte[] toJsonBytes(Object source) {
+ if (source == null) {
+ return null;
+ }
+ try {
+ return OBJECT_MAPPER.writeValueAsBytes(source);
+ } catch (JsonProcessingException e) {
+ log.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
public static <T> T fromJson(String jsonStr, Class<T> clazz) {
if (!StringUtils.hasText(jsonStr)) {
return null;
@@ -74,6 +91,25 @@ public final class JsonUtil {
}
}
+ /**
+ * byte array to Object
+ * @param jsonBytes json byte array
+ * @param clazz object class
+ * @param <T> object type
+ * @return object
+ */
+ public static <T> T fromJson(byte[] jsonBytes, Class<T> clazz) {
+ if (jsonBytes == null || jsonBytes.length == 0) {
+ return null;
+ }
+ try {
+ return OBJECT_MAPPER.readValue(jsonBytes, clazz);
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
public static <T> T fromJson(String jsonStr, TypeReference<T> type) {
if (!StringUtils.hasText(jsonStr)) {
return null;
@@ -85,7 +121,7 @@ public final class JsonUtil {
return null;
}
}
-
+
public static JsonNode fromJson(String jsonStr) {
if (!StringUtils.hasText(jsonStr)) {
return null;
@@ -143,4 +179,4 @@ public final class JsonUtil {
char end = jsonStr.charAt(jsonStr.length() - 1);
return (start == '{' && end == '}') || (start == '[' && end == ']');
}
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
index c939200e76..3a0682fc74 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,7 +17,6 @@
package org.apache.hertzbeat.manager.scheduler;
-import com.google.protobuf.ByteString;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
@@ -44,7 +43,7 @@ import
org.apache.hertzbeat.common.entity.manager.CollectorMonitorBind;
import org.apache.hertzbeat.common.entity.manager.Monitor;
import org.apache.hertzbeat.common.entity.manager.Param;
import org.apache.hertzbeat.common.entity.manager.ParamDefine;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.util.AesUtil;
import org.apache.hertzbeat.common.util.JsonUtil;
@@ -155,9 +154,9 @@ public class CollectorJobScheduler implements
CollectorScheduling, CollectJobSch
List<Param> params =
paramDao.findParamsByMonitorId(monitor.getId());
List<Configmap> configmaps = params.stream()
.map(param -> Configmap.builder()
- .key(param.getField())
- .value(param.getParamValue())
-
.type(param.getType()).build()).collect(Collectors.toList());
+ .key(param.getField())
+ .value(param.getParamValue())
+
.type(param.getType()).build()).collect(Collectors.toList());
List<ParamDefine> paramDefaultValue =
appDefine.getParams().stream()
.filter(item ->
StringUtils.isNotBlank(item.getDefaultValue()))
.toList();
@@ -214,10 +213,10 @@ public class CollectorJobScheduler implements
CollectorScheduling, CollectJobSch
if
(CommonConstants.MAIN_COLLECTOR_NODE.equals(collectorName)) {
collectJobService.addAsyncCollectJob(job);
} else {
- ClusterMsg.Message message =
ClusterMsg.Message.newBuilder()
- .setDirection(ClusterMsg.Direction.REQUEST)
-
.setType(ClusterMsg.MessageType.ISSUE_CYCLIC_TASK)
-
.setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job)))
+ ClusterMessage message = ClusterMessage.builder()
+ .direction(ClusterMessage.Direction.REQUEST)
+
.type(ClusterMessage.MessageType.ISSUE_CYCLIC_TASK)
+ .msg(JsonUtil.toJsonBytes(job))
.build();
this.manageServer.sendMsg(collectorName, message);
}
@@ -229,10 +228,10 @@ public class CollectorJobScheduler implements
CollectorScheduling, CollectJobSch
if (CommonConstants.MAIN_COLLECTOR_NODE.equals(collectorName))
{
assignJobs.getRemovingJobs().forEach(jobId ->
collectJobService.cancelAsyncCollectJob(jobId));
} else {
- ClusterMsg.Message message =
ClusterMsg.Message.newBuilder()
- .setDirection(ClusterMsg.Direction.REQUEST)
- .setType(ClusterMsg.MessageType.DELETE_CYCLIC_TASK)
-
.setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(assignJobs.getRemovingJobs())))
+ ClusterMessage message = ClusterMessage.builder()
+ .direction(ClusterMessage.Direction.REQUEST)
+
.type(ClusterMessage.MessageType.DELETE_CYCLIC_TASK)
+
.msg(JsonUtil.toJsonBytes(assignJobs.getRemovingJobs()))
.build();
this.manageServer.sendMsg(collectorName, message);
}
@@ -243,13 +242,13 @@ public class CollectorJobScheduler implements
CollectorScheduling, CollectJobSch
@Override
public boolean offlineCollector(String identity) {
- ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
- .setType(ClusterMsg.MessageType.GO_OFFLINE)
- .setDirection(ClusterMsg.Direction.REQUEST)
- .setIdentity(identity)
+ ClusterMessage message = ClusterMessage.builder()
+ .type(ClusterMessage.MessageType.GO_OFFLINE)
+ .direction(ClusterMessage.Direction.REQUEST)
+ .identity(identity)
.build();
- ClusterMsg.Message response = this.manageServer.sendMsgSync(identity,
message);
- if (response == null ||
!String.valueOf(CommonConstants.SUCCESS_CODE).equals(response.getMsg().toStringUtf8()))
{
+ ClusterMessage response = this.manageServer.sendMsgSync(identity,
message);
+ if (response == null ||
!String.valueOf(CommonConstants.SUCCESS_CODE).equals(response.getMsgString())) {
return false;
}
log.info("send offline collector message to {} success", identity);
@@ -264,14 +263,14 @@ public class CollectorJobScheduler implements
CollectorScheduling, CollectJobSch
return false;
}
ServerInfo serverInfo =
ServerInfo.builder().aesSecret(AesUtil.getDefaultSecretKey()).build();
- ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
- .setType(ClusterMsg.MessageType.GO_ONLINE)
- .setDirection(ClusterMsg.Direction.REQUEST)
- .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(serverInfo)))
- .setIdentity(identity)
+ ClusterMessage message = ClusterMessage.builder()
+ .type(ClusterMessage.MessageType.GO_ONLINE)
+ .direction(ClusterMessage.Direction.REQUEST)
+ .msg(JsonUtil.toJsonBytes(serverInfo))
+ .identity(identity)
.build();
- ClusterMsg.Message response = this.manageServer.sendMsgSync(identity,
message);
- if (response == null ||
!String.valueOf(CommonConstants.SUCCESS_CODE).equals(response.getMsg().toStringUtf8()))
{
+ ClusterMessage response = this.manageServer.sendMsgSync(identity,
message);
+ if (response == null ||
!String.valueOf(CommonConstants.SUCCESS_CODE).equals(response.getMsgString())) {
return false;
}
log.info("send online collector message to {} success", identity);
@@ -304,10 +303,10 @@ public class CollectorJobScheduler implements
CollectorScheduling, CollectJobSch
List<CollectRep.MetricsData> metricsData = new LinkedList<>();
CountDownLatch countDownLatch = new CountDownLatch(1);
- ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
- .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK)
- .setDirection(ClusterMsg.Direction.REQUEST)
- .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job)))
+ ClusterMessage message = ClusterMessage.builder()
+ .type(ClusterMessage.MessageType.ISSUE_ONE_TIME_TASK)
+ .direction(ClusterMessage.Direction.REQUEST)
+ .msg(JsonUtil.toJsonBytes(job))
.build();
boolean result = this.manageServer.sendMsg(node.getIdentity(),
message);
@@ -347,10 +346,10 @@ public class CollectorJobScheduler implements
CollectorScheduling, CollectJobSch
return collectJobService.collectSyncJobData(job);
}
List<CollectRep.MetricsData> metricsData = new LinkedList<>();
- ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
- .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK)
- .setDirection(ClusterMsg.Direction.REQUEST)
- .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job)))
+ ClusterMessage message = ClusterMessage.builder()
+ .type(ClusterMessage.MessageType.ISSUE_ONE_TIME_TASK)
+ .direction(ClusterMessage.Direction.REQUEST)
+ .msg(JsonUtil.toJsonBytes(job))
.build();
boolean result = this.manageServer.sendMsg(node.getIdentity(),
message);
if (result) {
@@ -399,10 +398,10 @@ public class CollectorJobScheduler implements
CollectorScheduling, CollectJobSch
if (CommonConstants.MAIN_COLLECTOR_NODE.equals(node.getIdentity())) {
collectJobService.addAsyncCollectJob(job);
} else {
- ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
- .setType(ClusterMsg.MessageType.ISSUE_CYCLIC_TASK)
- .setDirection(ClusterMsg.Direction.REQUEST)
- .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job)))
+ ClusterMessage message = ClusterMessage.builder()
+ .type(ClusterMessage.MessageType.ISSUE_CYCLIC_TASK)
+ .direction(ClusterMessage.Direction.REQUEST)
+ .msg(JsonUtil.toJsonBytes(job))
.build();
this.manageServer.sendMsg(node.getIdentity(), message);
}
@@ -443,10 +442,10 @@ public class CollectorJobScheduler implements
CollectorScheduling, CollectJobSch
if
(CommonConstants.MAIN_COLLECTOR_NODE.equals(node.getIdentity())) {
collectJobService.cancelAsyncCollectJob(jobId);
} else {
- ClusterMsg.Message deleteMessage =
ClusterMsg.Message.newBuilder()
- .setType(ClusterMsg.MessageType.DELETE_CYCLIC_TASK)
- .setDirection(ClusterMsg.Direction.REQUEST)
-
.setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(List.of(jobId))))
+ ClusterMessage deleteMessage = ClusterMessage.builder()
+
.type(ClusterMessage.MessageType.DELETE_CYCLIC_TASK)
+ .direction(ClusterMessage.Direction.REQUEST)
+ .msg(JsonUtil.toJsonBytes(List.of(jobId)))
.build();
this.manageServer.sendMsg(node.getIdentity(),
deleteMessage);
}
@@ -471,4 +470,4 @@ public class CollectorJobScheduler implements
CollectorScheduling, CollectJobSch
public void setManageServer(ManageServer manageServer) {
this.manageServer = manageServer;
}
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
index 352415b65f..fe2d5e6bb2 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
@@ -25,7 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.alert.calculate.CollectorAlertHandler;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.support.CommonThreadPool;
import org.apache.hertzbeat.manager.scheduler.CollectorJobScheduler;
import org.apache.hertzbeat.manager.scheduler.SchedulerProperties;
@@ -83,12 +83,12 @@ public class ManageServer implements CommandLineRunner {
this.remotingServer = new NettyRemotingServer(nettyServerConfig,
nettyEventListener, threadPool);
// register processor
-
this.remotingServer.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, new
HeartbeatProcessor(this));
-
this.remotingServer.registerProcessor(ClusterMsg.MessageType.GO_ONLINE, new
CollectorOnlineProcessor(this));
-
this.remotingServer.registerProcessor(ClusterMsg.MessageType.GO_OFFLINE, new
CollectorOfflineProcessor(this));
-
this.remotingServer.registerProcessor(ClusterMsg.MessageType.RESPONSE_ONE_TIME_TASK_DATA,
new CollectOneTimeDataResponseProcessor(this));
-
this.remotingServer.registerProcessor(ClusterMsg.MessageType.RESPONSE_CYCLIC_TASK_DATA,
new CollectCyclicDataResponseProcessor());
-
this.remotingServer.registerProcessor(ClusterMsg.MessageType.RESPONSE_CYCLIC_TASK_SD_DATA,
new CollectCyclicServiceDiscoveryDataResponseProcessor());
+
this.remotingServer.registerProcessor(ClusterMessage.MessageType.HEARTBEAT, new
HeartbeatProcessor(this));
+
this.remotingServer.registerProcessor(ClusterMessage.MessageType.GO_ONLINE, new
CollectorOnlineProcessor(this));
+
this.remotingServer.registerProcessor(ClusterMessage.MessageType.GO_OFFLINE,
new CollectorOfflineProcessor(this));
+
this.remotingServer.registerProcessor(ClusterMessage.MessageType.RESPONSE_ONE_TIME_TASK_DATA,
new CollectOneTimeDataResponseProcessor(this));
+
this.remotingServer.registerProcessor(ClusterMessage.MessageType.RESPONSE_CYCLIC_TASK_DATA,
new CollectCyclicDataResponseProcessor());
+
this.remotingServer.registerProcessor(ClusterMessage.MessageType.RESPONSE_CYCLIC_TASK_SD_DATA,
new CollectCyclicServiceDiscoveryDataResponseProcessor());
this.channelSchedule = Executors.newSingleThreadScheduledExecutor();
}
@@ -144,7 +144,7 @@ public class ManageServer implements CommandLineRunner {
Channel channel = this.getChannel(identity);
if (channel != null) {
this.collectorJobScheduler.collectorGoOffline(identity);
- ClusterMsg.Message message =
ClusterMsg.Message.newBuilder().setType(ClusterMsg.MessageType.GO_CLOSE).build();
+ ClusterMessage message =
ClusterMessage.builder().type(ClusterMessage.MessageType.GO_CLOSE).build();
this.remotingServer.sendMsg(channel, message);
this.clientChannelTable.remove(identity);
log.info("close collect client success, identity: {}", identity);
@@ -156,7 +156,7 @@ public class ManageServer implements CommandLineRunner {
return channel != null && channel.isActive();
}
- public boolean sendMsg(final String identityId, final ClusterMsg.Message
message) {
+ public boolean sendMsg(final String identityId, final ClusterMessage
message) {
Channel channel = this.getChannel(identityId);
if (channel != null) {
this.remotingServer.sendMsg(channel, message);
@@ -165,7 +165,7 @@ public class ManageServer implements CommandLineRunner {
return false;
}
- public ClusterMsg.Message sendMsgSync(final String identityId, final
ClusterMsg.Message message) {
+ public ClusterMessage sendMsgSync(final String identityId, final
ClusterMessage message) {
Channel channel = this.getChannel(identityId);
if (channel != null) {
return this.remotingServer.sendMsgSync(channel, message, 3000);
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicDataResponseProcessor.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicDataResponseProcessor.java
index 74d415c27c..12937d2574 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicDataResponseProcessor.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicDataResponseProcessor.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,7 +20,7 @@ package org.apache.hertzbeat.manager.scheduler.netty.process;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.queue.CommonDataQueue;
import org.apache.hertzbeat.common.support.SpringContextHolder;
@@ -33,9 +33,9 @@ import
org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
@Slf4j
public class CollectCyclicDataResponseProcessor implements
NettyRemotingProcessor {
@Override
- public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
+ public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage
message) {
CommonDataQueue dataQueue =
SpringContextHolder.getBean(CommonDataQueue.class);
- List<CollectRep.MetricsData> metricsDataList =
ArrowUtil.deserializeMetricsData(message.getMsg().toByteArray());
+ List<CollectRep.MetricsData> metricsDataList =
ArrowUtil.deserializeMetricsData(message.getMsg());
for (CollectRep.MetricsData metricsData : metricsDataList) {
if (metricsData != null) {
dataQueue.sendMetricsData(metricsData);
@@ -43,4 +43,4 @@ public class CollectCyclicDataResponseProcessor implements
NettyRemotingProcesso
}
return null;
}
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicServiceDiscoveryDataResponseProcessor.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicServiceDiscoveryDataResponseProcessor.java
index d8e8a8c248..174d0e7adb 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicServiceDiscoveryDataResponseProcessor.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicServiceDiscoveryDataResponseProcessor.java
@@ -20,7 +20,7 @@ package org.apache.hertzbeat.manager.scheduler.netty.process;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.queue.CommonDataQueue;
import org.apache.hertzbeat.common.support.SpringContextHolder;
@@ -33,9 +33,9 @@ import
org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
@Slf4j
public class CollectCyclicServiceDiscoveryDataResponseProcessor implements
NettyRemotingProcessor {
@Override
- public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
+ public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage
message) {
CommonDataQueue dataQueue =
SpringContextHolder.getBean(CommonDataQueue.class);
- List<CollectRep.MetricsData> metricsDataList =
ArrowUtil.deserializeMetricsData(message.getMsg().toByteArray());
+ List<CollectRep.MetricsData> metricsDataList =
ArrowUtil.deserializeMetricsData(message.getMsg());
for (CollectRep.MetricsData metricsData : metricsDataList) {
if (metricsData != null) {
dataQueue.sendServiceDiscoveryData(metricsData);
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectOneTimeDataResponseProcessor.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectOneTimeDataResponseProcessor.java
index b9e064d7b4..cabf016b39 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectOneTimeDataResponseProcessor.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectOneTimeDataResponseProcessor.java
@@ -20,7 +20,7 @@ package org.apache.hertzbeat.manager.scheduler.netty.process;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.util.ArrowUtil;
import org.apache.hertzbeat.manager.scheduler.netty.ManageServer;
@@ -39,9 +39,9 @@ public class CollectOneTimeDataResponseProcessor implements
NettyRemotingProcess
}
@Override
- public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
+ public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage
message) {
- List<CollectRep.MetricsData> metricsDataList =
ArrowUtil.deserializeMetricsData(message.getMsg().toByteArray());
+ List<CollectRep.MetricsData> metricsDataList =
ArrowUtil.deserializeMetricsData(message.getMsg());
this.manageServer.getCollectorAndJobScheduler().collectSyncJobResponse(metricsDataList);
return null;
}
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOfflineProcessor.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOfflineProcessor.java
index 85c5b0d132..137af5f0b5 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOfflineProcessor.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOfflineProcessor.java
@@ -19,7 +19,7 @@ package org.apache.hertzbeat.manager.scheduler.netty.process;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.manager.scheduler.netty.ManageServer;
import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
@@ -36,7 +36,7 @@ public class CollectorOfflineProcessor implements
NettyRemotingProcessor {
}
@Override
- public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
+ public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage
message) {
String collector = message.getIdentity();
log.info("the collector {} actively requests to go offline.",
collector);
this.manageServer.getCollectorAndJobScheduler().collectorGoOffline(collector);
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java
index 9a7e4f4e08..5486d4af3a 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java
@@ -17,14 +17,13 @@
package org.apache.hertzbeat.manager.scheduler.netty.process;
-import com.google.protobuf.ByteString;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hertzbeat.common.entity.dto.CollectorInfo;
import org.apache.hertzbeat.common.entity.dto.ServerInfo;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.util.AesUtil;
import org.apache.hertzbeat.common.util.JsonUtil;
import org.apache.hertzbeat.manager.scheduler.netty.ManageServer;
@@ -42,10 +41,10 @@ public class CollectorOnlineProcessor implements
NettyRemotingProcessor {
}
@Override
- public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
+ public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage
message) {
String collector = message.getIdentity();
log.info("the collector {} actively requests to go online.",
collector);
- String msg = message.getMsg().toStringUtf8();
+ String msg = message.getMsgString();
CollectorInfo collectorInfo = JsonUtil.fromJson(msg,
CollectorInfo.class);
if (collectorInfo != null &&
StringUtils.isBlank(collectorInfo.getIp())) {
// fetch remote ip address
@@ -56,11 +55,11 @@ public class CollectorOnlineProcessor implements
NettyRemotingProcessor {
this.manageServer.addChannel(collector, ctx.channel());
this.manageServer.getCollectorAndJobScheduler().collectorGoOnline(collector,
collectorInfo);
ServerInfo serverInfo =
ServerInfo.builder().aesSecret(AesUtil.getDefaultSecretKey()).build();
- return ClusterMsg.Message.newBuilder()
- .setIdentity(message.getIdentity())
- .setDirection(ClusterMsg.Direction.RESPONSE)
- .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(serverInfo)))
- .setType(ClusterMsg.MessageType.GO_ONLINE)
+ return ClusterMessage.builder()
+ .identity(message.getIdentity())
+ .direction(ClusterMessage.Direction.RESPONSE)
+ .msg(JsonUtil.toJsonBytes(serverInfo))
+ .type(ClusterMessage.MessageType.GO_ONLINE)
.build();
}
}
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
index 8cf7050279..62e873d0cf 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
@@ -19,7 +19,7 @@ package org.apache.hertzbeat.manager.scheduler.netty.process;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.manager.scheduler.netty.ManageServer;
import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
@@ -36,7 +36,7 @@ public class HeartbeatProcessor implements
NettyRemotingProcessor {
}
@Override
- public ClusterMsg.Message handle(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
+ public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage
message) {
String identity = message.getIdentity();
boolean isChannelActive = this.manageServer.isChannelActive(identity);
if (!isChannelActive) {
@@ -52,8 +52,10 @@ public class HeartbeatProcessor implements
NettyRemotingProcessor {
if (log.isDebugEnabled()) {
log.debug("server receive collector {} heartbeat",
message.getIdentity());
}
- return ClusterMsg.Message.newBuilder()
- .setType(ClusterMsg.MessageType.HEARTBEAT)
+ return ClusterMessage.builder()
+ .identity(message.getIdentity())
+ .type(ClusterMessage.MessageType.HEARTBEAT)
+ .direction(ClusterMessage.Direction.RESPONSE)
.build();
}
}
diff --git
a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java
b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java
index e068c7b7fb..e77535445a 100644
---
a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java
+++
b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java
@@ -22,7 +22,7 @@ import org.apache.hertzbeat.common.entity.dto.CollectorInfo;
import org.apache.hertzbeat.common.entity.job.Job;
import org.apache.hertzbeat.common.entity.manager.CollectorMonitorBind;
import org.apache.hertzbeat.common.entity.manager.Monitor;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.util.JsonUtil;
import org.apache.hertzbeat.manager.dao.CollectorDao;
@@ -137,11 +137,11 @@ public class CollectorJobSchedulerTest {
collectorJobScheduler.collectorGoOnline(identity, collectorInfo);
// Capture the parameters of sendMsg
- ArgumentCaptor<ClusterMsg.Message> msgCaptor =
ArgumentCaptor.forClass(ClusterMsg.Message.class);
+ ArgumentCaptor<ClusterMessage> msgCaptor =
ArgumentCaptor.forClass(ClusterMessage.class);
verify(manageServer, atLeastOnce()).sendMsg(eq("collector-1"),
msgCaptor.capture());
- ClusterMsg.Message message = msgCaptor.getValue();
+ ClusterMessage message = msgCaptor.getValue();
- Job job = JsonUtil.fromJson(message.getMsg().toStringUtf8(),
Job.class);
+ Job job = JsonUtil.fromJson(message.getMsgString(), Job.class);
assertNotNull(job);
assertNotNull(job.getMetadata());
assertEquals("test-monitor",
job.getMetadata().get(CommonConstants.LABEL_INSTANCE_NAME));
diff --git
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingClient.java
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingClient.java
index dc5b4a557c..af363641af 100644
---
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingClient.java
+++
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingClient.java
@@ -17,7 +17,7 @@
package org.apache.hertzbeat.remoting;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
/**
@@ -30,13 +30,13 @@ public interface RemotingClient extends RemotingService {
* @param messageType type
* @param processor remoting processor
*/
- void registerProcessor(ClusterMsg.MessageType messageType,
NettyRemotingProcessor processor);
+ void registerProcessor(ClusterMessage.MessageType messageType,
NettyRemotingProcessor processor);
/**
* send message to server
* @param request request message
*/
- void sendMsg(ClusterMsg.Message request);
+ void sendMsg(ClusterMessage request);
/**
* send message to server and sync waiting receive server message
@@ -44,5 +44,5 @@ public interface RemotingClient extends RemotingService {
* @param timeoutMillis timeout millis
* @return response message
*/
- ClusterMsg.Message sendMsgSync(ClusterMsg.Message request, int
timeoutMillis);
+ ClusterMessage sendMsgSync(ClusterMessage request, int timeoutMillis);
}
diff --git
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingServer.java
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingServer.java
index 35fdd1d48f..b9af22cd84 100644
---
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingServer.java
+++
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingServer.java
@@ -19,7 +19,7 @@ package org.apache.hertzbeat.remoting;
import io.netty.channel.Channel;
import java.util.List;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.remoting.netty.NettyHook;
import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
@@ -33,14 +33,14 @@ public interface RemotingServer extends RemotingService {
* @param messageType type
* @param processor remoting processor
*/
- void registerProcessor(ClusterMsg.MessageType messageType,
NettyRemotingProcessor processor);
+ void registerProcessor(ClusterMessage.MessageType messageType,
NettyRemotingProcessor processor);
/**
* send message to client
* @param channel client channel
* @param request request message
*/
- void sendMsg(Channel channel, ClusterMsg.Message request);
+ void sendMsg(Channel channel, ClusterMessage request);
/**
* send message to client and receive client message
@@ -49,7 +49,7 @@ public interface RemotingServer extends RemotingService {
* @param timeoutMillis timeout millis
* @return response message
*/
- ClusterMsg.Message sendMsgSync(Channel channel, ClusterMsg.Message
request, int timeoutMillis);
+ ClusterMessage sendMsgSync(Channel channel, ClusterMessage request, int
timeoutMillis);
/**
* register hook.
diff --git
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyHook.java
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyHook.java
index 27d149b5cb..0b75b4a8f6 100644
---
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyHook.java
+++
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyHook.java
@@ -18,13 +18,13 @@
package org.apache.hertzbeat.remoting.netty;
import io.netty.channel.ChannelHandlerContext;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
/**
* hook interface, handle something before request processor
*/
public interface NettyHook {
- void doBeforeRequest(ChannelHandlerContext ctx, ClusterMsg.Message
message);
+ void doBeforeRequest(ChannelHandlerContext ctx, ClusterMessage message);
}
diff --git
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java
index 3d9390f467..a721f7da3f 100644
---
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java
+++
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,19 +27,19 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.util.NetworkUtil;
import org.apache.hertzbeat.remoting.RemotingService;
import org.apache.hertzbeat.remoting.event.NettyEventListener;
/**
- * Derived from Apache Rocketmq
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract
+ * Derived from Apache Rocketmq
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract
* netty remote abstract
* @see <a
href="https://github.com/apache/rocketmq/blob/develop/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java">NettyRemotingAbstract</a>
*/
@Slf4j
public abstract class NettyRemotingAbstract implements RemotingService {
- protected ConcurrentHashMap<ClusterMsg.MessageType,
NettyRemotingProcessor> processorTable = new ConcurrentHashMap<>();
+ protected ConcurrentHashMap<ClusterMessage.MessageType,
NettyRemotingProcessor> processorTable = new ConcurrentHashMap<>();
protected ConcurrentHashMap<String, ResponseFuture> responseTable = new
ConcurrentHashMap<>();
@@ -51,33 +51,32 @@ public abstract class NettyRemotingAbstract implements
RemotingService {
this.nettyEventListener = nettyEventListener;
}
- public void registerProcessor(final ClusterMsg.MessageType messageType,
final NettyRemotingProcessor processor) {
+ public void registerProcessor(final ClusterMessage.MessageType
messageType, final NettyRemotingProcessor processor) {
this.processorTable.put(messageType, processor);
}
- protected void processReceiveMsg(ChannelHandlerContext ctx,
ClusterMsg.Message message) {
- if (ClusterMsg.Direction.REQUEST.equals(message.getDirection())) {
+ protected void processReceiveMsg(ChannelHandlerContext ctx, ClusterMessage
message) {
+ if (ClusterMessage.Direction.REQUEST.equals(message.getDirection())) {
this.processRequestMsg(ctx, message);
} else {
this.processResponseMsg(ctx, message);
}
}
- protected void processRequestMsg(ChannelHandlerContext ctx,
ClusterMsg.Message request) {
+ protected void processRequestMsg(ChannelHandlerContext ctx, ClusterMessage
request) {
this.doBeforeRequest(ctx, request);
-
NettyRemotingProcessor processor =
this.processorTable.get(request.getType());
if (processor == null) {
log.info("request type {} not supported", request.getType());
return;
}
- ClusterMsg.Message response = processor.handle(ctx, request);
+ ClusterMessage response = processor.handle(ctx, request);
if (response != null) {
ctx.writeAndFlush(response);
}
}
- private void doBeforeRequest(ChannelHandlerContext ctx, ClusterMsg.Message
request) {
+ private void doBeforeRequest(ChannelHandlerContext ctx, ClusterMessage
request) {
if (CollectionUtils.isEmpty(this.nettyHookList)) {
return;
}
@@ -86,7 +85,7 @@ public abstract class NettyRemotingAbstract implements
RemotingService {
}
}
- protected void processResponseMsg(ChannelHandlerContext ctx,
ClusterMsg.Message response) {
+ protected void processResponseMsg(ChannelHandlerContext ctx,
ClusterMessage response) {
// for sync response
if (this.responseTable.containsKey(response.getIdentity())) {
ResponseFuture responseFuture =
this.responseTable.get(response.getIdentity());
@@ -95,15 +94,15 @@ public abstract class NettyRemotingAbstract implements
RemotingService {
// async response
NettyRemotingProcessor processor =
this.processorTable.get(response.getType());
if (processor != null) {
- ClusterMsg.Message repMessage = processor.handle(ctx,
response);
+ ClusterMessage repMessage = processor.handle(ctx, response);
if (repMessage != null) {
ctx.writeAndFlush(repMessage);
}
- }
+ }
}
}
- protected void sendMsgImpl(final Channel channel, final ClusterMsg.Message
request) {
+ protected void sendMsgImpl(final Channel channel, final ClusterMessage
request) {
channel.writeAndFlush(request).addListener(future -> {
if (!future.isSuccess()) {
log.warn("send request message failed. address: {}, ",
channel.remoteAddress(), future.cause());
@@ -111,7 +110,7 @@ public abstract class NettyRemotingAbstract implements
RemotingService {
});
}
- protected ClusterMsg.Message sendMsgSyncImpl(final Channel channel, final
ClusterMsg.Message request, final int timeoutMillis) {
+ protected ClusterMessage sendMsgSyncImpl(final Channel channel, final
ClusterMessage request, final int timeoutMillis) {
final String identity = request.getIdentity();
try {
@@ -123,7 +122,7 @@ public abstract class NettyRemotingAbstract implements
RemotingService {
log.warn("send request message failed. request: {},
address: {}, ", request, channel.remoteAddress(), future.cause());
}
});
- ClusterMsg.Message response =
responseFuture.waitResponse(timeoutMillis);
+ ClusterMessage response =
responseFuture.waitResponse(timeoutMillis);
if (response == null) {
log.warn("get response message failed, message is null");
}
@@ -155,4 +154,4 @@ public abstract class NettyRemotingAbstract implements
RemotingService {
&& Epoll.isAvailable();
}
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
index a01c7fab80..7c6233f454 100644
---
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
+++
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -29,21 +29,18 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.compression.ZlibCodecFactory;
-import io.netty.handler.codec.compression.ZlibWrapper;
-import io.netty.handler.codec.protobuf.ProtobufDecoder;
-import io.netty.handler.codec.protobuf.ProtobufEncoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
import java.util.concurrent.ThreadFactory;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.support.CommonThreadPool;
import org.apache.hertzbeat.remoting.RemotingClient;
import org.apache.hertzbeat.remoting.event.NettyEventListener;
+import org.apache.hertzbeat.remoting.netty.codec.ForyCodec;
/**
- * Derived from Apache Rocketmq
org.apache.rocketmq.remoting.netty.NettyRemotingClient
+ * Derived from Apache Rocketmq
org.apache.rocketmq.remoting.netty.NettyRemotingClient
* netty client
* @see <a
href="https://github.com/apache/rocketmq/blob/develop/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java">NettyRemotingClient</a>
*/
@@ -51,7 +48,7 @@ import org.apache.hertzbeat.remoting.event.NettyEventListener;
public class NettyRemotingClient extends NettyRemotingAbstract implements
RemotingClient {
private static final int DEFAULT_WORKER_THREAD_NUM = Math.min(4,
Runtime.getRuntime().availableProcessors());
-
+
private final NettyClientConfig nettyClientConfig;
private final CommonThreadPool threadPool;
@@ -81,7 +78,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
.setDaemon(true)
.setNameFormat("netty-client-worker-%d")
.build();
- String envThreadNum =
System.getProperty("hertzbeat.client.worker.thread.num");
+ String envThreadNum =
System.getProperty("hertzbeat.client.worker.thread.num");
int workerThreadNum = envThreadNum != null ?
Integer.parseInt(envThreadNum) : DEFAULT_WORKER_THREAD_NUM;
this.workerGroup = new NioEventLoopGroup(workerThreadNum,
threadFactory);
this.bootstrap.group(workerGroup)
@@ -96,7 +93,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
this.channel = null;
boolean first = true;
- while (!Thread.currentThread().isInterrupted()
+ while (!Thread.currentThread().isInterrupted()
&& (first || this.channel == null ||
!this.channel.isActive())) {
first = false;
try {
@@ -121,14 +118,11 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
private void initChannel(final SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
- // zip
- pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
- pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
- // protocol buf encode decode
- pipeline.addLast(new ProtobufVarint32FrameDecoder());
- pipeline.addLast(new
ProtobufDecoder(ClusterMsg.Message.getDefaultInstance()));
- pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
- pipeline.addLast(new ProtobufEncoder());
+ // max frame length 10MB
+ pipeline.addLast(new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0,
4, 0, 4));
+ pipeline.addLast(new LengthFieldPrepender(4));
+ // fory codec
+ pipeline.addLast(new ForyCodec());
pipeline.addLast(new NettyClientHandler());
}
@@ -155,16 +149,16 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
}
@Override
- public void sendMsg(final ClusterMsg.Message request) {
+ public void sendMsg(final ClusterMessage request) {
this.sendMsgImpl(this.channel, request);
}
@Override
- public ClusterMsg.Message sendMsgSync(ClusterMsg.Message request, int
timeoutMillis) {
+ public ClusterMessage sendMsgSync(ClusterMessage request, int
timeoutMillis) {
return this.sendMsgSyncImpl(this.channel, request, timeoutMillis);
}
- class NettyClientHandler extends
SimpleChannelInboundHandler<ClusterMsg.Message> {
+ class NettyClientHandler extends
SimpleChannelInboundHandler<ClusterMessage> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
@@ -172,7 +166,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
}
@Override
- protected void channelRead0(ChannelHandlerContext ctx,
ClusterMsg.Message msg) throws Exception {
+ protected void channelRead0(ChannelHandlerContext ctx, ClusterMessage
msg) throws Exception {
NettyRemotingClient.this.processReceiveMsg(ctx, msg);
}
@@ -181,4 +175,4 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
NettyRemotingClient.this.channelIdle(ctx, evt);
}
}
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingProcessor.java
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingProcessor.java
index 27c68dc1ff..12e7739db6 100644
---
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingProcessor.java
+++
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingProcessor.java
@@ -18,7 +18,7 @@
package org.apache.hertzbeat.remoting.netty;
import io.netty.channel.ChannelHandlerContext;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
/**
* Derived from Apache Rocketmq
org.apache.rocketmq.remoting.netty.NettyRequestProcessor
@@ -27,6 +27,6 @@ import org.apache.hertzbeat.common.entity.message.ClusterMsg;
*/
public interface NettyRemotingProcessor {
- ClusterMsg.Message handle(ChannelHandlerContext ctx, ClusterMsg.Message
message);
+ ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage message);
}
diff --git
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
index 48062fadb9..63eb567e8f 100644
---
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
+++
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -32,25 +32,22 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.compression.ZlibCodecFactory;
-import io.netty.handler.codec.compression.ZlibWrapper;
-import io.netty.handler.codec.protobuf.ProtobufDecoder;
-import io.netty.handler.codec.protobuf.ProtobufEncoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.support.CommonThreadPool;
import org.apache.hertzbeat.remoting.RemotingServer;
import org.apache.hertzbeat.remoting.event.NettyEventListener;
+import org.apache.hertzbeat.remoting.netty.codec.ForyCodec;
/**
- * Derived from Apache Rocketmq
org.apache.rocketmq.remoting.netty.NettyRemotingServer
+ * Derived from Apache Rocketmq
org.apache.rocketmq.remoting.netty.NettyRemotingServer
* netty server
* @see <a
href="https://github.com/apache/rocketmq/blob/develop/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java">NettyRemotingServer</a>
*/
@@ -137,14 +134,11 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
private void initChannel(final SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
- // zip
- pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
- pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
- // protocol buf encode decode
- pipeline.addLast(new ProtobufVarint32FrameDecoder());
- pipeline.addLast(new
ProtobufDecoder(ClusterMsg.Message.getDefaultInstance()));
- pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
- pipeline.addLast(new ProtobufEncoder());
+ // fory codec
+ // max frame length 10MB
+ pipeline.addLast(new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0,
4, 0, 4));
+ pipeline.addLast(new LengthFieldPrepender(4));
+ pipeline.addLast(new ForyCodec());
// idle state
pipeline.addLast(new IdleStateHandler(0, 0,
nettyServerConfig.getIdleStateEventTriggerTime()));
pipeline.addLast(new NettyServerHandler());
@@ -165,12 +159,12 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
@Override
- public void sendMsg(final Channel channel, final ClusterMsg.Message
request) {
+ public void sendMsg(final Channel channel, final ClusterMessage request) {
this.sendMsgImpl(channel, request);
}
@Override
- public ClusterMsg.Message sendMsgSync(final Channel channel, final
ClusterMsg.Message request, final int timeoutMillis) {
+ public ClusterMessage sendMsgSync(final Channel channel, final
ClusterMessage request, final int timeoutMillis) {
return this.sendMsgSyncImpl(channel, request, timeoutMillis);
}
@@ -183,7 +177,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
* netty server handler
*/
@ChannelHandler.Sharable
- public class NettyServerHandler extends
SimpleChannelInboundHandler<ClusterMsg.Message> {
+ public class NettyServerHandler extends
SimpleChannelInboundHandler<ClusterMessage> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
@@ -191,7 +185,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
@Override
- protected void channelRead0(ChannelHandlerContext ctx,
ClusterMsg.Message msg) throws Exception {
+ protected void channelRead0(ChannelHandlerContext ctx, ClusterMessage
msg) {
NettyRemotingServer.this.processReceiveMsg(ctx, msg);
}
@@ -200,4 +194,4 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
NettyRemotingServer.this.channelIdle(ctx, evt);
}
}
-}
+}
\ No newline at end of file
diff --git
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/ResponseFuture.java
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/ResponseFuture.java
index 3a9a38295e..34ecbf2a1a 100644
---
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/ResponseFuture.java
+++
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/ResponseFuture.java
@@ -19,7 +19,7 @@ package org.apache.hertzbeat.remoting.netty;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
/**
* netty response future
@@ -28,14 +28,14 @@ public class ResponseFuture {
private final CountDownLatch countDownLatch = new CountDownLatch(1);
- private ClusterMsg.Message response;
+ private ClusterMessage response;
- public ClusterMsg.Message waitResponse(final long timeoutMillis) throws
InterruptedException {
+ public ClusterMessage waitResponse(final long timeoutMillis) throws
InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.response;
}
- public void putResponse(final ClusterMsg.Message response) {
+ public void putResponse(final ClusterMessage response) {
this.response = response;
this.countDownLatch.countDown();
}
diff --git
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/codec/ForyCodec.java
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/codec/ForyCodec.java
new file mode 100644
index 0000000000..ad8cde8554
--- /dev/null
+++
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/codec/ForyCodec.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.remoting.netty.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageCodec;
+import org.apache.fory.Fory;
+import org.apache.fory.ThreadSafeFory;
+import org.apache.fory.config.Language;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
+
+import java.util.List;
+
+/**
+ * Netty codec for ClusterMessage using Apache fory
+ */
+public class ForyCodec extends ByteToMessageCodec<ClusterMessage> {
+
+ private static final ThreadSafeFory fory;
+
+ static {
+ // 1. Initialize Fory in XLANG mode
+ fory = Fory.builder()
+ .withLanguage(Language.XLANG)
+ .requireClassRegistration(false)
+ .buildThreadSafeFory();
+
+ // 2. Register classes with specific names for Cross-Language
compatibility (Java <-> Go)
+ // These names (e.g., "ClusterMessage") must match the registration
name in the Go client.
+ fory.register(ClusterMessage.class, "ClusterMessage");
+ fory.register(ClusterMessage.MessageType.class, "MessageType");
+ fory.register(ClusterMessage.Direction.class, "Direction");
+ }
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, ClusterMessage msg,
ByteBuf out) {
+ byte[] bytes = fory.serialize(msg);
+ out.writeBytes(bytes);
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) {
+ byte[] bytes = new byte[in.readableBytes()];
+ in.readBytes(bytes);
+ Object obj = fory.deserialize(bytes);
+ out.add(obj);
+ }
+}
\ No newline at end of file
diff --git
a/hertzbeat-remoting/src/test/java/org/apache/hertzbeat/remoting/RemotingServiceTest.java
b/hertzbeat-remoting/src/test/java/org/apache/hertzbeat/remoting/RemotingServiceTest.java
index 4a852d99d8..cebcf37380 100644
---
a/hertzbeat-remoting/src/test/java/org/apache/hertzbeat/remoting/RemotingServiceTest.java
+++
b/hertzbeat-remoting/src/test/java/org/apache/hertzbeat/remoting/RemotingServiceTest.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,8 +17,8 @@
package org.apache.hertzbeat.remoting;
-import com.google.protobuf.ByteString;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import java.nio.charset.StandardCharsets;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
import org.apache.hertzbeat.common.support.CommonThreadPool;
import org.apache.hertzbeat.remoting.netty.NettyClientConfig;
import org.apache.hertzbeat.remoting.netty.NettyRemotingClient;
@@ -102,15 +102,15 @@ public class RemotingServiceTest {
public void testSendMsg() {
final String msg = "hello world";
-
this.remotingServer.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, (ctx,
message) -> {
- Assertions.assertEquals(msg, message.getMsg().toStringUtf8());
+
this.remotingServer.registerProcessor(ClusterMessage.MessageType.HEARTBEAT,
(ctx, message) -> {
+ Assertions.assertEquals(msg, message.getMsgString());
return null;
});
- ClusterMsg.Message request = ClusterMsg.Message.newBuilder()
- .setDirection(ClusterMsg.Direction.REQUEST)
- .setType(ClusterMsg.MessageType.HEARTBEAT)
- .setMsg(ByteString.copyFromUtf8(msg))
+ ClusterMessage request = ClusterMessage.builder()
+ .direction(ClusterMessage.Direction.REQUEST)
+ .type(ClusterMessage.MessageType.HEARTBEAT)
+ .msg(msg.getBytes(StandardCharsets.UTF_8))
.build();
this.remotingClient.sendMsg(request);
}
@@ -120,40 +120,42 @@ public class RemotingServiceTest {
final String requestMsg = "request";
final String responseMsg = "response";
-
this.remotingServer.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, (ctx,
message) -> {
- Assertions.assertEquals(requestMsg,
message.getMsg().toStringUtf8());
- return ClusterMsg.Message.newBuilder()
- .setDirection(ClusterMsg.Direction.RESPONSE)
- .setMsg(ByteString.copyFromUtf8(responseMsg))
+
this.remotingServer.registerProcessor(ClusterMessage.MessageType.HEARTBEAT,
(ctx, message) -> {
+ Assertions.assertEquals(requestMsg, message.getMsgString());
+ return ClusterMessage.builder()
+ .direction(ClusterMessage.Direction.RESPONSE)
+ .type(ClusterMessage.MessageType.HEARTBEAT)
+ .msg(responseMsg.getBytes(StandardCharsets.UTF_8))
.build();
});
- ClusterMsg.Message request = ClusterMsg.Message.newBuilder()
- .setDirection(ClusterMsg.Direction.REQUEST)
- .setType(ClusterMsg.MessageType.HEARTBEAT)
- .setMsg(ByteString.copyFromUtf8(requestMsg))
+ ClusterMessage request = ClusterMessage.builder()
+ .direction(ClusterMessage.Direction.REQUEST)
+ .type(ClusterMessage.MessageType.HEARTBEAT)
+ .msg(requestMsg.getBytes(StandardCharsets.UTF_8))
.build();
- ClusterMsg.Message response = this.remotingClient.sendMsgSync(request,
3000);
- Assertions.assertEquals(responseMsg, response.getMsg().toStringUtf8());
+ ClusterMessage response = this.remotingClient.sendMsgSync(request,
3000);
+ Assertions.assertEquals(responseMsg, response.getMsgString());
}
@Test
public void testNettyHook() {
this.remotingServer.registerHook(Lists.newArrayList(
- (ctx, message) -> Assertions.assertEquals("hello world",
message.getMsg().toStringUtf8())
+ (ctx, message) -> Assertions.assertEquals("hello world",
message.getMsgString())
));
-
this.remotingServer.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, (ctx,
message) ->
- ClusterMsg.Message.newBuilder()
- .setDirection(ClusterMsg.Direction.RESPONSE)
+
this.remotingServer.registerProcessor(ClusterMessage.MessageType.HEARTBEAT,
(ctx, message) ->
+ ClusterMessage.builder()
+ .direction(ClusterMessage.Direction.RESPONSE)
+ .type(ClusterMessage.MessageType.HEARTBEAT)
.build());
- ClusterMsg.Message request = ClusterMsg.Message.newBuilder()
- .setDirection(ClusterMsg.Direction.REQUEST)
- .setType(ClusterMsg.MessageType.HEARTBEAT)
- .setMsg(ByteString.copyFromUtf8("hello world"))
+ ClusterMessage request = ClusterMessage.builder()
+ .direction(ClusterMessage.Direction.REQUEST)
+ .type(ClusterMessage.MessageType.HEARTBEAT)
+ .msg("hello world".getBytes(StandardCharsets.UTF_8))
.build();
this.remotingClient.sendMsg(request);
}
-}
+}
\ No newline at end of file
diff --git a/material/licenses/backend/LICENSE
b/material/licenses/backend/LICENSE
index 826b50efca..1234a692ad 100644
--- a/material/licenses/backend/LICENSE
+++ b/material/licenses/backend/LICENSE
@@ -437,6 +437,7 @@ The text of each license is the standard Apache 2.0 license.
https://mvnrepository.com/artifact/com.vesoft/client/3.6.0
https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper/3.9.3
Apache-2.0
https://mvnrepository.com/artifact/io.opentelemetry.proto/opentelemetry-proto/1.7.0-alpha
Apache-2.0
+ https://mvnrepository.com/artifact/org.apache.fory/fory-core/0.13.1
Apache-2.0
========================================================================
diff --git a/pom.xml b/pom.xml
index 4196824b5b..75ed3da453 100644
--- a/pom.xml
+++ b/pom.xml
@@ -177,6 +177,7 @@
<opentelemetry-starter.version>2.22.0</opentelemetry-starter.version>
<opentelemetry-logback.version>2.22.0-alpha</opentelemetry-logback.version>
<opentelemetry-proto.version>1.9.0-alpha</opentelemetry-proto.version>
+ <fory.version>0.13.1</fory.version>
</properties>
<dependencyManagement>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]