This is an automated email from the ASF dual-hosted git repository.

zhaoqingran pushed a commit to branch dev-2.0.0
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/dev-2.0.0 by this push:
     new b9f1d65cb5 [refactor] Apache Fory (#3873)
b9f1d65cb5 is described below

commit b9f1d65cb58a6ef0e6832741eb27557e70fc3dfc
Author: Logic <[email protected]>
AuthorDate: Thu Nov 27 17:28:44 2025 +0800

    [refactor] Apache Fory (#3873)
    
    Signed-off-by: Logic <[email protected]>
    Co-authored-by: Duansg <[email protected]>
---
 .../dispatch/entrance/CollectServerTest.java       |   6 +-
 .../entrance/internal/CollectJobServiceTest.java   |   6 +-
 .../collector/dispatch/entrance/CollectServer.java |  43 ++++----
 .../entrance/internal/CollectJobService.java       |  35 +++----
 .../processor/CollectCyclicDataProcessor.java      |   6 +-
 .../processor/CollectOneTimeDataProcessor.java     |   6 +-
 .../processor/DeleteCyclicTaskProcessor.java       |   6 +-
 .../entrance/processor/GoCloseProcessor.java       |   8 +-
 .../entrance/processor/GoOfflineProcessor.java     |  19 ++--
 .../entrance/processor/GoOnlineProcessor.java      |  28 +++--
 .../entrance/processor/HeartbeatProcessor.java     |   4 +-
 hertzbeat-common/pom.xml                           |  11 ++
 .../common/entity/message/ClusterMessage.java      | 114 +++++++++++++++++++++
 .../apache/hertzbeat/common/util/ArrowUtil.java    |  13 ++-
 .../org/apache/hertzbeat/common/util/JsonUtil.java |  42 +++++++-
 .../manager/scheduler/CollectorJobScheduler.java   |  87 ++++++++--------
 .../manager/scheduler/netty/ManageServer.java      |  20 ++--
 .../CollectCyclicDataResponseProcessor.java        |  10 +-
 ...yclicServiceDiscoveryDataResponseProcessor.java |   6 +-
 .../CollectOneTimeDataResponseProcessor.java       |   6 +-
 .../netty/process/CollectorOfflineProcessor.java   |   4 +-
 .../netty/process/CollectorOnlineProcessor.java    |  17 ++-
 .../netty/process/HeartbeatProcessor.java          |  10 +-
 .../scheduler/CollectorJobSchedulerTest.java       |   8 +-
 .../apache/hertzbeat/remoting/RemotingClient.java  |   8 +-
 .../apache/hertzbeat/remoting/RemotingServer.java  |   8 +-
 .../apache/hertzbeat/remoting/netty/NettyHook.java |   4 +-
 .../remoting/netty/NettyRemotingAbstract.java      |  35 +++----
 .../remoting/netty/NettyRemotingClient.java        |  44 ++++----
 .../remoting/netty/NettyRemotingProcessor.java     |   4 +-
 .../remoting/netty/NettyRemotingServer.java        |  38 +++----
 .../hertzbeat/remoting/netty/ResponseFuture.java   |   8 +-
 .../hertzbeat/remoting/netty/codec/ForyCodec.java  |  64 ++++++++++++
 .../hertzbeat/remoting/RemotingServiceTest.java    |  60 +++++------
 material/licenses/backend/LICENSE                  |   1 +
 pom.xml                                            |   1 +
 36 files changed, 501 insertions(+), 289 deletions(-)

diff --git 
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServerTest.java
 
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServerTest.java
index 3112e1e5ba..a4123c80c7 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServerTest.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServerTest.java
@@ -29,7 +29,7 @@ import 
org.apache.hertzbeat.collector.dispatch.CollectorInfoProperties;
 import org.apache.hertzbeat.collector.dispatch.DispatchProperties;
 import 
org.apache.hertzbeat.collector.dispatch.entrance.internal.CollectJobService;
 import org.apache.hertzbeat.collector.timer.TimerDispatch;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.support.CommonThreadPool;
 import org.apache.hertzbeat.remoting.RemotingClient;
 import org.junit.jupiter.api.BeforeEach;
@@ -113,7 +113,7 @@ class CollectServerTest {
 
         RemotingClient remotingClient = mock(RemotingClient.class);
         ReflectionTestUtils.setField(collectServer, "remotingClient", 
remotingClient);
-        ClusterMsg.Message message = mock(ClusterMsg.Message.class);
+        ClusterMessage message = mock(ClusterMessage.class);
 
         collectServer.sendMsg(message);
 
@@ -135,7 +135,7 @@ class CollectServerTest {
         collectNettyEventListener.onChannelActive(channel);
 
         verify(timerDispatch, times(1)).goOnline();
-        verify(remotingClient, 
times(1)).sendMsg(any(ClusterMsg.Message.class));
+        verify(remotingClient, times(1)).sendMsg(any(ClusterMessage.class));
 
         ScheduledExecutorService scheduledExecutor =
                 (ScheduledExecutorService) 
ReflectionTestUtils.getField(collectServer, "scheduledExecutor");
diff --git 
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobServiceTest.java
 
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobServiceTest.java
index 574103b0a8..125eded7dd 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobServiceTest.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobServiceTest.java
@@ -34,7 +34,7 @@ import org.apache.hertzbeat.collector.dispatch.WorkerPool;
 import org.apache.hertzbeat.collector.dispatch.entrance.CollectServer;
 import org.apache.hertzbeat.collector.timer.TimerDispatch;
 import org.apache.hertzbeat.common.entity.job.Job;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -116,7 +116,7 @@ class CollectJobServiceTest {
 
         collectJobService.collectSyncOneTimeJobData(job);
 
-        verify(collectServer, times(1)).sendMsg(any(ClusterMsg.Message.class));
+        verify(collectServer, times(1)).sendMsg(any(ClusterMessage.class));
     }
 
     @Test
@@ -137,7 +137,7 @@ class CollectJobServiceTest {
                 .build();
         collectJobService.sendAsyncCollectData(metricsData);
 
-        verify(collectServer, times(1)).sendMsg(any(ClusterMsg.Message.class));
+        verify(collectServer, times(1)).sendMsg(any(ClusterMessage.class));
     }
 
     @Test
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServer.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServer.java
index 9b34a11d4b..6fd62e04ec 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServer.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/CollectServer.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,7 +18,6 @@
 package org.apache.hertzbeat.collector.dispatch.entrance;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.ByteString;
 import io.netty.channel.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.collector.dispatch.CollectorInfoProperties;
@@ -33,7 +32,7 @@ import 
org.apache.hertzbeat.collector.dispatch.entrance.processor.GoOnlineProces
 import 
org.apache.hertzbeat.collector.dispatch.entrance.processor.HeartbeatProcessor;
 import org.apache.hertzbeat.collector.timer.TimerDispatch;
 import org.apache.hertzbeat.common.entity.dto.CollectorInfo;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.support.CommonThreadPool;
 import org.apache.hertzbeat.common.util.JsonUtil;
 import org.apache.hertzbeat.remoting.RemotingClient;
@@ -65,7 +64,7 @@ public class CollectServer implements CommandLineRunner {
     private final TimerDispatch timerDispatch;
 
     private final CollectorInfoProperties infoProperties;
-    
+
     private RemotingClient remotingClient;
 
     private ScheduledExecutorService scheduledExecutor;
@@ -97,13 +96,13 @@ public class CollectServer implements CommandLineRunner {
         nettyClientConfig.setServerPort(nettyProperties.getManagerPort());
         this.remotingClient = new NettyRemotingClient(nettyClientConfig, new 
CollectNettyEventListener(), threadPool);
 
-        
this.remotingClient.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, new 
HeartbeatProcessor());
-        
this.remotingClient.registerProcessor(ClusterMsg.MessageType.ISSUE_CYCLIC_TASK, 
new CollectCyclicDataProcessor(this));
-        
this.remotingClient.registerProcessor(ClusterMsg.MessageType.DELETE_CYCLIC_TASK,
 new DeleteCyclicTaskProcessor(this));
-        
this.remotingClient.registerProcessor(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK,
 new CollectOneTimeDataProcessor(this));
-        
this.remotingClient.registerProcessor(ClusterMsg.MessageType.GO_OFFLINE, new 
GoOfflineProcessor());
-        
this.remotingClient.registerProcessor(ClusterMsg.MessageType.GO_ONLINE, new 
GoOnlineProcessor());
-        this.remotingClient.registerProcessor(ClusterMsg.MessageType.GO_CLOSE, 
new GoCloseProcessor(this));
+        
this.remotingClient.registerProcessor(ClusterMessage.MessageType.HEARTBEAT, new 
HeartbeatProcessor());
+        
this.remotingClient.registerProcessor(ClusterMessage.MessageType.ISSUE_CYCLIC_TASK,
 new CollectCyclicDataProcessor(this));
+        
this.remotingClient.registerProcessor(ClusterMessage.MessageType.DELETE_CYCLIC_TASK,
 new DeleteCyclicTaskProcessor(this));
+        
this.remotingClient.registerProcessor(ClusterMessage.MessageType.ISSUE_ONE_TIME_TASK,
 new CollectOneTimeDataProcessor(this));
+        
this.remotingClient.registerProcessor(ClusterMessage.MessageType.GO_OFFLINE, 
new GoOfflineProcessor());
+        
this.remotingClient.registerProcessor(ClusterMessage.MessageType.GO_ONLINE, new 
GoOnlineProcessor());
+        
this.remotingClient.registerProcessor(ClusterMessage.MessageType.GO_CLOSE, new 
GoCloseProcessor(this));
     }
 
     public void shutdown() {
@@ -116,7 +115,7 @@ public class CollectServer implements CommandLineRunner {
         return collectJobService;
     }
 
-    public void sendMsg(final ClusterMsg.Message message) {
+    public void sendMsg(final ClusterMessage message) {
         this.remotingClient.sendMsg(message);
     }
 
@@ -143,10 +142,10 @@ public class CollectServer implements CommandLineRunner {
                     .build();
             timerDispatch.goOnline();
             // send online message
-            ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
-                    .setIdentity(identity)
-                    .setType(ClusterMsg.MessageType.GO_ONLINE)
-                    
.setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(collectorInfo)))
+            ClusterMessage message = ClusterMessage.builder()
+                    .identity(identity)
+                    .type(ClusterMessage.MessageType.GO_ONLINE)
+                    .msg(JsonUtil.toJsonBytes(collectorInfo))
                     .build();
             CollectServer.this.sendMsg(message);
 
@@ -163,13 +162,13 @@ public class CollectServer implements CommandLineRunner {
                 // schedule send heartbeat message
                 scheduledExecutor.scheduleAtFixedRate(() -> {
                     try {
-                        ClusterMsg.Message heartbeat = 
ClusterMsg.Message.newBuilder()
-                                .setIdentity(identity)
-                                .setDirection(ClusterMsg.Direction.REQUEST)
-                                .setType(ClusterMsg.MessageType.HEARTBEAT)
+                        ClusterMessage heartbeat = ClusterMessage.builder()
+                                .identity(identity)
+                                .direction(ClusterMessage.Direction.REQUEST)
+                                .type(ClusterMessage.MessageType.HEARTBEAT)
                                 .build();
                         CollectServer.this.sendMsg(heartbeat);
-                        log.info("collector send cluster server heartbeat, 
time: {}.", System.currentTimeMillis());   
+                        log.info("collector send cluster server heartbeat, 
time: {}.", System.currentTimeMillis());
                     } catch (Exception e) {
                         log.error("schedule send heartbeat to server 
error.{}", e.getMessage());
                     }
@@ -182,4 +181,4 @@ public class CollectServer implements CommandLineRunner {
             log.info("handle idle event triggered. collector is going 
offline.");
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobService.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobService.java
index 616abdbfe5..247a06a3ee 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobService.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/internal/CollectJobService.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,7 +17,6 @@
 
 package org.apache.hertzbeat.collector.dispatch.entrance.internal;
 
-import com.google.protobuf.ByteString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.collector.dispatch.DispatchProperties;
 import org.apache.hertzbeat.collector.dispatch.WorkerPool;
@@ -25,7 +24,7 @@ import 
org.apache.hertzbeat.collector.dispatch.entrance.CollectServer;
 import org.apache.hertzbeat.collector.timer.TimerDispatch;
 import org.apache.hertzbeat.common.constants.CommonConstants;
 import org.apache.hertzbeat.common.entity.job.Job;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.util.ArrowUtil;
 import org.apache.hertzbeat.common.util.IpDomainUtil;
@@ -117,10 +116,10 @@ public class CollectJobService {
         workerPool.executeJob(() -> {
             List<CollectRep.MetricsData> metricsDataList = 
this.collectSyncJobData(oneTimeJob);
             byte[] msg = ArrowUtil.serializeMetricsData(metricsDataList);
-            ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
-                    .setMsg(ByteString.copyFrom(msg))
-                    .setDirection(ClusterMsg.Direction.REQUEST)
-                    
.setType(ClusterMsg.MessageType.RESPONSE_ONE_TIME_TASK_DATA)
+            ClusterMessage message = ClusterMessage.builder()
+                    .msg(msg)
+                    .direction(ClusterMessage.Direction.REQUEST)
+                    
.type(ClusterMessage.MessageType.RESPONSE_ONE_TIME_TASK_DATA)
                     .build();
             this.collectServer.sendMsg(message);
         });
@@ -153,22 +152,22 @@ public class CollectJobService {
      */
     public void sendAsyncCollectData(CollectRep.MetricsData metricsData) {
         byte[] msg = ArrowUtil.serializeMetricsData(List.of(metricsData));
-        ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
-                .setIdentity(collectorIdentity)
-                .setMsg(ByteString.copyFrom(msg))
-                .setDirection(ClusterMsg.Direction.REQUEST)
-                .setType(ClusterMsg.MessageType.RESPONSE_CYCLIC_TASK_DATA)
+        ClusterMessage message = ClusterMessage.builder()
+                .identity(collectorIdentity)
+                .msg(msg)
+                .direction(ClusterMessage.Direction.REQUEST)
+                .type(ClusterMessage.MessageType.RESPONSE_CYCLIC_TASK_DATA)
                 .build();
         this.collectServer.sendMsg(message);
     }
 
     public void sendAsyncServiceDiscoveryData(CollectRep.MetricsData 
metricsData) {
         byte[] msg = ArrowUtil.serializeMetricsData(List.of(metricsData));
-        ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
-                .setIdentity(collectorIdentity)
-                .setMsg(ByteString.copyFrom(msg))
-                .setDirection(ClusterMsg.Direction.REQUEST)
-                .setType(ClusterMsg.MessageType.RESPONSE_CYCLIC_TASK_SD_DATA)
+        ClusterMessage message = ClusterMessage.builder()
+                .identity(collectorIdentity)
+                .msg(msg)
+                .direction(ClusterMessage.Direction.REQUEST)
+                .type(ClusterMessage.MessageType.RESPONSE_CYCLIC_TASK_SD_DATA)
                 .build();
         this.collectServer.sendMsg(message);
     }
@@ -184,4 +183,4 @@ public class CollectJobService {
     public void setCollectServer(CollectServer collectServer) {
         this.collectServer = collectServer;
     }
-}
+}
\ No newline at end of file
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectCyclicDataProcessor.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectCyclicDataProcessor.java
index f37f71d850..cd01bcdf38 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectCyclicDataProcessor.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectCyclicDataProcessor.java
@@ -21,7 +21,7 @@ import io.netty.channel.ChannelHandlerContext;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.collector.dispatch.entrance.CollectServer;
 import org.apache.hertzbeat.common.entity.job.Job;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.util.JsonUtil;
 import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
 
@@ -37,8 +37,8 @@ public class CollectCyclicDataProcessor implements 
NettyRemotingProcessor {
     }
 
     @Override
-    public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
-        Job job = JsonUtil.fromJson(message.getMsg().toStringUtf8(), 
Job.class);
+    public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage 
message) {
+        Job job = JsonUtil.fromJson(message.getMsg(), Job.class);
         if (job == null) {
             log.error("collector receive cyclic task job is null");
             return null;
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectOneTimeDataProcessor.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectOneTimeDataProcessor.java
index 74f3d526cd..49cba7e611 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectOneTimeDataProcessor.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/CollectOneTimeDataProcessor.java
@@ -20,7 +20,7 @@ package 
org.apache.hertzbeat.collector.dispatch.entrance.processor;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.hertzbeat.collector.dispatch.entrance.CollectServer;
 import org.apache.hertzbeat.common.entity.job.Job;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.util.JsonUtil;
 import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
 
@@ -35,8 +35,8 @@ public class CollectOneTimeDataProcessor implements 
NettyRemotingProcessor {
     }
 
     @Override
-    public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
-        Job oneTimeJob = JsonUtil.fromJson(message.getMsg().toStringUtf8(), 
Job.class);
+    public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage 
message) {
+        Job oneTimeJob = JsonUtil.fromJson(message.getMsg(), Job.class);
         
collectServer.getCollectJobService().collectSyncOneTimeJobData(oneTimeJob);
         return null;
     }
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/DeleteCyclicTaskProcessor.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/DeleteCyclicTaskProcessor.java
index 0499a8b2e0..b715dd6240 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/DeleteCyclicTaskProcessor.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/DeleteCyclicTaskProcessor.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import io.netty.channel.ChannelHandlerContext;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.collector.dispatch.entrance.CollectServer;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.util.JsonUtil;
 import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
 
@@ -39,9 +39,9 @@ public class DeleteCyclicTaskProcessor implements 
NettyRemotingProcessor {
     }
 
     @Override
-    public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
+    public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage 
message) {
         TypeReference<List<Long>> typeReference = new TypeReference<>() {};
-        List<Long> jobIds = JsonUtil.fromJson(message.getMsg().toStringUtf8(), 
typeReference);
+        List<Long> jobIds = JsonUtil.fromJson(message.getMsgString(), 
typeReference);
         if (jobIds == null || jobIds.isEmpty()) {
             log.error("collector receive delete cyclic task job ids is null");
             return null;
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoCloseProcessor.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoCloseProcessor.java
index 4b068f1e45..9b2674ec31 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoCloseProcessor.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoCloseProcessor.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.collector.dispatch.entrance.CollectServer;
 import org.apache.hertzbeat.collector.timer.TimerDispatch;
 import org.apache.hertzbeat.common.constants.CommonConstants;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.support.SpringContextHolder;
 import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
 import org.springframework.boot.SpringApplication;
@@ -41,12 +41,12 @@ public class GoCloseProcessor implements 
NettyRemotingProcessor {
     }
 
     @Override
-    public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
+    public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage 
message) {
         if (this.timerDispatch == null) {
             this.timerDispatch = 
SpringContextHolder.getBean(TimerDispatch.class);
         }
-        if 
(message.getMsg().toStringUtf8().contains(CommonConstants.COLLECTOR_AUTH_FAILED))
 {
-            log.error("[Auth Failed]receive client auth failed message and go 
close. {}", message.getMsg());
+        if 
(message.getMsgString().contains(CommonConstants.COLLECTOR_AUTH_FAILED)) {
+            log.error("[Auth Failed]receive client auth failed message and go 
close. {}", message.getMsgString());
         }
         this.timerDispatch.goOffline();
         this.collectServer.shutdown();
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOfflineProcessor.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOfflineProcessor.java
index aad85194ee..c3200b4b29 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOfflineProcessor.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOfflineProcessor.java
@@ -17,12 +17,12 @@
 
 package org.apache.hertzbeat.collector.dispatch.entrance.processor;
 
-import com.google.protobuf.ByteString;
 import io.netty.channel.ChannelHandlerContext;
+import java.nio.charset.StandardCharsets;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.collector.timer.TimerDispatch;
 import org.apache.hertzbeat.common.constants.CommonConstants;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.support.SpringContextHolder;
 import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
 
@@ -36,20 +36,21 @@ public class GoOfflineProcessor implements 
NettyRemotingProcessor {
     private TimerDispatch timerDispatch;
     
     @Override
-    public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
+    public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage 
message) {
         if (this.timerDispatch == null) {
             this.timerDispatch = 
SpringContextHolder.getBean(TimerDispatch.class);
         }
         timerDispatch.goOffline();
         log.info("receive offline message and handle success");
-        if 
(message.getMsg().toStringUtf8().contains(CommonConstants.COLLECTOR_AUTH_FAILED))
 {
-            log.error("[Auth Failed]receive client auth failed message and go 
offline. {}", message.getMsg());
+        if 
(message.getMsgString().contains(CommonConstants.COLLECTOR_AUTH_FAILED)) {
+            log.error("[Auth Failed]receive client auth failed message and go 
offline. {}", message.getMsgString());
             return null;
         }
-        return ClusterMsg.Message.newBuilder()
-                .setIdentity(message.getIdentity())
-                .setDirection(ClusterMsg.Direction.RESPONSE)
-                
.setMsg(ByteString.copyFromUtf8(String.valueOf(CommonConstants.SUCCESS_CODE)))
+        return ClusterMessage.builder()
+                .identity(message.getIdentity())
+                .direction(ClusterMessage.Direction.RESPONSE)
+                .type(ClusterMessage.MessageType.GO_OFFLINE)
+                
.msg(String.valueOf(CommonConstants.SUCCESS_CODE).getBytes(StandardCharsets.UTF_8))
                 .build();
     }
 }
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessor.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessor.java
index 34dda15197..e7a2fd3344 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessor.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/GoOnlineProcessor.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,13 +17,11 @@
 
 package org.apache.hertzbeat.collector.dispatch.entrance.processor;
 
-import com.google.protobuf.ByteString;
 import io.netty.channel.ChannelHandlerContext;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.collector.timer.TimerDispatch;
-import org.apache.hertzbeat.common.constants.CommonConstants;
 import org.apache.hertzbeat.common.entity.dto.ServerInfo;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.support.SpringContextHolder;
 import org.apache.hertzbeat.common.util.AesUtil;
 import org.apache.hertzbeat.common.util.JsonUtil;
@@ -35,18 +33,20 @@ import 
org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
  */
 @Slf4j
 public class GoOnlineProcessor implements NettyRemotingProcessor {
-    
+
     private TimerDispatch timerDispatch;
-    
+
     @Override
-    public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
+    public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage 
message) {
         if (this.timerDispatch == null) {
             this.timerDispatch = 
SpringContextHolder.getBean(TimerDispatch.class);
         }
-        if (message.getMsg().isEmpty()) {
+        String msgString = message.getMsgString();
+        if (message.getMsg() == null || msgString == null || 
msgString.isEmpty()) {
             log.warn("The message that server response to collector is empty, 
please upgrade server");
         } else {
-            ServerInfo serverInfo = 
JsonUtil.fromJson(message.getMsg().toStringUtf8(), ServerInfo.class);
+            // Use the new JsonUtil.fromJson(byte[], Class) method
+            ServerInfo serverInfo = JsonUtil.fromJson(message.getMsg(), 
ServerInfo.class);
             if (serverInfo == null || serverInfo.getAesSecret() == null) {
                 log.warn("The message that server response to collector has 
not secret empty, please check");
             } else {
@@ -55,10 +55,8 @@ public class GoOnlineProcessor implements 
NettyRemotingProcessor {
         }
         timerDispatch.goOnline();
         log.info("receive online message and handle success");
-        return ClusterMsg.Message.newBuilder()
-                .setIdentity(message.getIdentity())
-                .setDirection(ClusterMsg.Direction.RESPONSE)
-                
.setMsg(ByteString.copyFromUtf8(String.valueOf(CommonConstants.SUCCESS_CODE)))
-                .build();
+        // Return null to stop the ping-pong loop.
+        // The collector should not reply to the server's confirmation 
response.
+        return null;
     }
-}
+}
\ No newline at end of file
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/HeartbeatProcessor.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/HeartbeatProcessor.java
index 9fc036cf01..eece81bdd1 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/HeartbeatProcessor.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/entrance/processor/HeartbeatProcessor.java
@@ -19,7 +19,7 @@ package 
org.apache.hertzbeat.collector.dispatch.entrance.processor;
 
 import io.netty.channel.ChannelHandlerContext;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
 
 /**
@@ -28,7 +28,7 @@ import 
org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
 @Slf4j
 public class HeartbeatProcessor implements NettyRemotingProcessor {
     @Override
-    public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
+    public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage 
message) {
         log.info("collector receive manager server response heartbeat, time: 
{}. ", System.currentTimeMillis());
         return null;
     }
diff --git a/hertzbeat-common/pom.xml b/hertzbeat-common/pom.xml
index 8b31253382..c273ca7430 100644
--- a/hertzbeat-common/pom.xml
+++ b/hertzbeat-common/pom.xml
@@ -171,6 +171,17 @@
             <version>${javaparser.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.fory</groupId>
+            <artifactId>fory-core</artifactId>
+            <version>${fory.version}</version>
+        </dependency>
+        <!-- row/arrow format support -->
+        <dependency>
+            <groupId>org.apache.fory</groupId>
+            <artifactId>fory-format</artifactId>
+            <version>${fory.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents.client5</groupId>
             <artifactId>httpclient5</artifactId>
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/message/ClusterMessage.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/message/ClusterMessage.java
new file mode 100644
index 0000000000..7db5a69448
--- /dev/null
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/message/ClusterMessage.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.entity.message;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.nio.charset.StandardCharsets;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * cluster message entity for fury serialization
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class ClusterMessage implements Serializable {
+
+    /**
+     * collector identity
+     */
+    @Builder.Default
+    private String identity = "";
+
+    /**
+     * message direction
+     */
+    @Builder.Default
+    private Direction direction = Direction.REQUEST;
+
+    /**
+     * message type
+     */
+    private MessageType type;
+
+    /**
+     * message content
+     * Use byte[] to ensure data integrity for both JSON strings (UTF-8) and 
Binary data (Arrow).
+     * Avoids String encoding/decoding issues (Mojibake) across different JVMs 
or Languages.
+     */
+    private byte[] msg;
+
+    @JsonIgnore
+    public String getMsgString() {
+        if (this.msg == null) {
+            return null;
+        }
+        return new String(this.msg, StandardCharsets.UTF_8);
+    }
+
+    @JsonIgnore
+    public void setMsgString(String jsonString) {
+        if (jsonString != null) {
+            this.msg = jsonString.getBytes(StandardCharsets.UTF_8);
+        } else {
+            this.msg = null;
+        }
+    }
+
+    /**
+     * Message Type Enum
+     */
+    public enum MessageType {
+        // heartbeat message
+        HEARTBEAT,
+        // collector go online to master message
+        GO_ONLINE,
+        // collector go offline to master message
+        GO_OFFLINE,
+        // collector go close to master
+        GO_CLOSE,
+        // issue cyclic collect task
+        ISSUE_CYCLIC_TASK,
+        // delete cyclic collect task
+        DELETE_CYCLIC_TASK,
+        // issue one-time collect task
+        ISSUE_ONE_TIME_TASK,
+        // response one-time collect data
+        RESPONSE_ONE_TIME_TASK_DATA,
+        // response cyclic collect data
+        RESPONSE_CYCLIC_TASK_DATA,
+        // response cyclic service discovery data
+        RESPONSE_CYCLIC_TASK_SD_DATA
+    }
+
+    /**
+     * Direction Enum
+     */
+    public enum Direction {
+        // request message
+        REQUEST,
+        // request response
+        RESPONSE
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
index f2a04b6af9..82188a6b08 100644
--- 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -37,7 +37,7 @@ import org.apache.hertzbeat.common.entity.message.CollectRep;
  */
 @Slf4j
 public final class ArrowUtil {
-    
+
     private ArrowUtil() {
     }
 
@@ -53,7 +53,7 @@ public final class ArrowUtil {
     public static byte[] serializeMultipleRoots(List<VectorSchemaRoot> roots) {
         try (ByteArrayOutputStream out = new ByteArrayOutputStream();
              DataOutputStream dataOut = new DataOutputStream(out)) {
-            
+
             dataOut.writeInt(roots.size());
             for (VectorSchemaRoot root : roots) {
                 ArrowStreamWriter writer = new ArrowStreamWriter(
@@ -87,10 +87,10 @@ public final class ArrowUtil {
         List<VectorSchemaRoot> roots = new ArrayList<>();
         try (ByteArrayInputStream in = new ByteArrayInputStream(data);
              DataInputStream dataIn = new DataInputStream(in)) {
-            
+
             int rootCount = dataIn.readInt();
             RootAllocator allocator = new RootAllocator();
-            
+
             for (int i = 0; i < rootCount; i++) {
                 ArrowStreamReader reader = new ArrowStreamReader(
                         Channels.newChannel(in),
@@ -153,5 +153,4 @@ public final class ArrowUtil {
         }
         return serializeMultipleRoots(roots);
     }
-    
-}
+}
\ No newline at end of file
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/JsonUtil.java 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/JsonUtil.java
index b7a46e3a03..ca3ce0846f 100644
--- 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/JsonUtil.java
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/JsonUtil.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -62,6 +62,23 @@ public final class JsonUtil {
         }
     }
 
+    /**
+     * Object to byte array
+     * @param source object
+     * @return byte array
+     */
+    public static byte[] toJsonBytes(Object source) {
+        if (source == null) {
+            return null;
+        }
+        try {
+            return OBJECT_MAPPER.writeValueAsBytes(source);
+        } catch (JsonProcessingException e) {
+            log.error(e.getMessage(), e);
+            return null;
+        }
+    }
+
     public static <T> T fromJson(String jsonStr, Class<T> clazz) {
         if (!StringUtils.hasText(jsonStr)) {
             return null;
@@ -74,6 +91,25 @@ public final class JsonUtil {
         }
     }
 
+    /**
+     * byte array to Object
+     * @param jsonBytes json byte array
+     * @param clazz object class
+     * @param <T> object type
+     * @return object
+     */
+    public static <T> T fromJson(byte[] jsonBytes, Class<T> clazz) {
+        if (jsonBytes == null || jsonBytes.length == 0) {
+            return null;
+        }
+        try {
+            return OBJECT_MAPPER.readValue(jsonBytes, clazz);
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            return null;
+        }
+    }
+
     public static <T> T fromJson(String jsonStr, TypeReference<T> type) {
         if (!StringUtils.hasText(jsonStr)) {
             return null;
@@ -85,7 +121,7 @@ public final class JsonUtil {
             return null;
         }
     }
-    
+
     public static JsonNode fromJson(String jsonStr) {
         if (!StringUtils.hasText(jsonStr)) {
             return null;
@@ -143,4 +179,4 @@ public final class JsonUtil {
         char end = jsonStr.charAt(jsonStr.length() - 1);
         return (start == '{' && end == '}') || (start == '[' && end == ']');
     }
-}
+}
\ No newline at end of file
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
index c939200e76..3a0682fc74 100644
--- 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,7 +17,6 @@
 
 package org.apache.hertzbeat.manager.scheduler;
 
-import com.google.protobuf.ByteString;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -44,7 +43,7 @@ import 
org.apache.hertzbeat.common.entity.manager.CollectorMonitorBind;
 import org.apache.hertzbeat.common.entity.manager.Monitor;
 import org.apache.hertzbeat.common.entity.manager.Param;
 import org.apache.hertzbeat.common.entity.manager.ParamDefine;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.util.AesUtil;
 import org.apache.hertzbeat.common.util.JsonUtil;
@@ -155,9 +154,9 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
                 List<Param> params = 
paramDao.findParamsByMonitorId(monitor.getId());
                 List<Configmap> configmaps = params.stream()
                         .map(param -> Configmap.builder()
-                                        .key(param.getField())
-                                        .value(param.getParamValue())
-                                        
.type(param.getType()).build()).collect(Collectors.toList());
+                                .key(param.getField())
+                                .value(param.getParamValue())
+                                
.type(param.getType()).build()).collect(Collectors.toList());
                 List<ParamDefine> paramDefaultValue = 
appDefine.getParams().stream()
                         .filter(item -> 
StringUtils.isNotBlank(item.getDefaultValue()))
                         .toList();
@@ -214,10 +213,10 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
                     if 
(CommonConstants.MAIN_COLLECTOR_NODE.equals(collectorName)) {
                         collectJobService.addAsyncCollectJob(job);
                     } else {
-                        ClusterMsg.Message message = 
ClusterMsg.Message.newBuilder()
-                                .setDirection(ClusterMsg.Direction.REQUEST)
-                                
.setType(ClusterMsg.MessageType.ISSUE_CYCLIC_TASK)
-                                
.setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job)))
+                        ClusterMessage message = ClusterMessage.builder()
+                                .direction(ClusterMessage.Direction.REQUEST)
+                                
.type(ClusterMessage.MessageType.ISSUE_CYCLIC_TASK)
+                                .msg(JsonUtil.toJsonBytes(job))
                                 .build();
                         this.manageServer.sendMsg(collectorName, message);
                     }
@@ -229,10 +228,10 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
                 if (CommonConstants.MAIN_COLLECTOR_NODE.equals(collectorName)) 
{
                     assignJobs.getRemovingJobs().forEach(jobId -> 
collectJobService.cancelAsyncCollectJob(jobId));
                 } else {
-                    ClusterMsg.Message message = 
ClusterMsg.Message.newBuilder()
-                            .setDirection(ClusterMsg.Direction.REQUEST)
-                            .setType(ClusterMsg.MessageType.DELETE_CYCLIC_TASK)
-                            
.setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(assignJobs.getRemovingJobs())))
+                    ClusterMessage message = ClusterMessage.builder()
+                            .direction(ClusterMessage.Direction.REQUEST)
+                            
.type(ClusterMessage.MessageType.DELETE_CYCLIC_TASK)
+                            
.msg(JsonUtil.toJsonBytes(assignJobs.getRemovingJobs()))
                             .build();
                     this.manageServer.sendMsg(collectorName, message);
                 }
@@ -243,13 +242,13 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
 
     @Override
     public boolean offlineCollector(String identity) {
-        ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
-                .setType(ClusterMsg.MessageType.GO_OFFLINE)
-                .setDirection(ClusterMsg.Direction.REQUEST)
-                .setIdentity(identity)
+        ClusterMessage message = ClusterMessage.builder()
+                .type(ClusterMessage.MessageType.GO_OFFLINE)
+                .direction(ClusterMessage.Direction.REQUEST)
+                .identity(identity)
                 .build();
-        ClusterMsg.Message response = this.manageServer.sendMsgSync(identity, 
message);
-        if (response == null || 
!String.valueOf(CommonConstants.SUCCESS_CODE).equals(response.getMsg().toStringUtf8()))
 {
+        ClusterMessage response = this.manageServer.sendMsgSync(identity, 
message);
+        if (response == null || 
!String.valueOf(CommonConstants.SUCCESS_CODE).equals(response.getMsgString())) {
             return false;
         }
         log.info("send offline collector message to {} success", identity);
@@ -264,14 +263,14 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
             return false;
         }
         ServerInfo serverInfo = 
ServerInfo.builder().aesSecret(AesUtil.getDefaultSecretKey()).build();
-        ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
-                .setType(ClusterMsg.MessageType.GO_ONLINE)
-                .setDirection(ClusterMsg.Direction.REQUEST)
-                .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(serverInfo)))
-                .setIdentity(identity)
+        ClusterMessage message = ClusterMessage.builder()
+                .type(ClusterMessage.MessageType.GO_ONLINE)
+                .direction(ClusterMessage.Direction.REQUEST)
+                .msg(JsonUtil.toJsonBytes(serverInfo))
+                .identity(identity)
                 .build();
-        ClusterMsg.Message response = this.manageServer.sendMsgSync(identity, 
message);
-        if (response == null || 
!String.valueOf(CommonConstants.SUCCESS_CODE).equals(response.getMsg().toStringUtf8()))
 {
+        ClusterMessage response = this.manageServer.sendMsgSync(identity, 
message);
+        if (response == null || 
!String.valueOf(CommonConstants.SUCCESS_CODE).equals(response.getMsgString())) {
             return false;
         }
         log.info("send online collector message to {} success", identity);
@@ -304,10 +303,10 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
             List<CollectRep.MetricsData> metricsData = new LinkedList<>();
             CountDownLatch countDownLatch = new CountDownLatch(1);
 
-            ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
-                    .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK)
-                    .setDirection(ClusterMsg.Direction.REQUEST)
-                    .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job)))
+            ClusterMessage message = ClusterMessage.builder()
+                    .type(ClusterMessage.MessageType.ISSUE_ONE_TIME_TASK)
+                    .direction(ClusterMessage.Direction.REQUEST)
+                    .msg(JsonUtil.toJsonBytes(job))
                     .build();
             boolean result = this.manageServer.sendMsg(node.getIdentity(), 
message);
 
@@ -347,10 +346,10 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
             return collectJobService.collectSyncJobData(job);
         }
         List<CollectRep.MetricsData> metricsData = new LinkedList<>();
-        ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
-                .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK)
-                .setDirection(ClusterMsg.Direction.REQUEST)
-                .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job)))
+        ClusterMessage message = ClusterMessage.builder()
+                .type(ClusterMessage.MessageType.ISSUE_ONE_TIME_TASK)
+                .direction(ClusterMessage.Direction.REQUEST)
+                .msg(JsonUtil.toJsonBytes(job))
                 .build();
         boolean result = this.manageServer.sendMsg(node.getIdentity(), 
message);
         if (result) {
@@ -399,10 +398,10 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
         if (CommonConstants.MAIN_COLLECTOR_NODE.equals(node.getIdentity())) {
             collectJobService.addAsyncCollectJob(job);
         } else {
-            ClusterMsg.Message message = ClusterMsg.Message.newBuilder()
-                    .setType(ClusterMsg.MessageType.ISSUE_CYCLIC_TASK)
-                    .setDirection(ClusterMsg.Direction.REQUEST)
-                    .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job)))
+            ClusterMessage message = ClusterMessage.builder()
+                    .type(ClusterMessage.MessageType.ISSUE_CYCLIC_TASK)
+                    .direction(ClusterMessage.Direction.REQUEST)
+                    .msg(JsonUtil.toJsonBytes(job))
                     .build();
             this.manageServer.sendMsg(node.getIdentity(), message);
         }
@@ -443,10 +442,10 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
                 if 
(CommonConstants.MAIN_COLLECTOR_NODE.equals(node.getIdentity())) {
                     collectJobService.cancelAsyncCollectJob(jobId);
                 } else {
-                    ClusterMsg.Message deleteMessage = 
ClusterMsg.Message.newBuilder()
-                            .setType(ClusterMsg.MessageType.DELETE_CYCLIC_TASK)
-                            .setDirection(ClusterMsg.Direction.REQUEST)
-                            
.setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(List.of(jobId))))
+                    ClusterMessage deleteMessage = ClusterMessage.builder()
+                            
.type(ClusterMessage.MessageType.DELETE_CYCLIC_TASK)
+                            .direction(ClusterMessage.Direction.REQUEST)
+                            .msg(JsonUtil.toJsonBytes(List.of(jobId)))
                             .build();
                     this.manageServer.sendMsg(node.getIdentity(), 
deleteMessage);
                 }
@@ -471,4 +470,4 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
     public void setManageServer(ManageServer manageServer) {
         this.manageServer = manageServer;
     }
-}
+}
\ No newline at end of file
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
index 352415b65f..fe2d5e6bb2 100644
--- 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
@@ -25,7 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.alert.calculate.CollectorAlertHandler;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.support.CommonThreadPool;
 import org.apache.hertzbeat.manager.scheduler.CollectorJobScheduler;
 import org.apache.hertzbeat.manager.scheduler.SchedulerProperties;
@@ -83,12 +83,12 @@ public class ManageServer implements CommandLineRunner {
         this.remotingServer = new NettyRemotingServer(nettyServerConfig, 
nettyEventListener, threadPool);
         
         // register processor
-        
this.remotingServer.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, new 
HeartbeatProcessor(this));
-        
this.remotingServer.registerProcessor(ClusterMsg.MessageType.GO_ONLINE, new 
CollectorOnlineProcessor(this));
-        
this.remotingServer.registerProcessor(ClusterMsg.MessageType.GO_OFFLINE, new 
CollectorOfflineProcessor(this));
-        
this.remotingServer.registerProcessor(ClusterMsg.MessageType.RESPONSE_ONE_TIME_TASK_DATA,
 new CollectOneTimeDataResponseProcessor(this));
-        
this.remotingServer.registerProcessor(ClusterMsg.MessageType.RESPONSE_CYCLIC_TASK_DATA,
 new CollectCyclicDataResponseProcessor());
-        
this.remotingServer.registerProcessor(ClusterMsg.MessageType.RESPONSE_CYCLIC_TASK_SD_DATA,
 new CollectCyclicServiceDiscoveryDataResponseProcessor());
+        
this.remotingServer.registerProcessor(ClusterMessage.MessageType.HEARTBEAT, new 
HeartbeatProcessor(this));
+        
this.remotingServer.registerProcessor(ClusterMessage.MessageType.GO_ONLINE, new 
CollectorOnlineProcessor(this));
+        
this.remotingServer.registerProcessor(ClusterMessage.MessageType.GO_OFFLINE, 
new CollectorOfflineProcessor(this));
+        
this.remotingServer.registerProcessor(ClusterMessage.MessageType.RESPONSE_ONE_TIME_TASK_DATA,
 new CollectOneTimeDataResponseProcessor(this));
+        
this.remotingServer.registerProcessor(ClusterMessage.MessageType.RESPONSE_CYCLIC_TASK_DATA,
 new CollectCyclicDataResponseProcessor());
+        
this.remotingServer.registerProcessor(ClusterMessage.MessageType.RESPONSE_CYCLIC_TASK_SD_DATA,
 new CollectCyclicServiceDiscoveryDataResponseProcessor());
 
         this.channelSchedule = Executors.newSingleThreadScheduledExecutor();
     }
@@ -144,7 +144,7 @@ public class ManageServer implements CommandLineRunner {
         Channel channel = this.getChannel(identity);
         if (channel != null) {
             this.collectorJobScheduler.collectorGoOffline(identity);
-            ClusterMsg.Message message = 
ClusterMsg.Message.newBuilder().setType(ClusterMsg.MessageType.GO_CLOSE).build();
+            ClusterMessage message = 
ClusterMessage.builder().type(ClusterMessage.MessageType.GO_CLOSE).build();
             this.remotingServer.sendMsg(channel, message);
             this.clientChannelTable.remove(identity);
             log.info("close collect client success, identity: {}", identity);
@@ -156,7 +156,7 @@ public class ManageServer implements CommandLineRunner {
         return channel != null && channel.isActive();
     }
 
-    public boolean sendMsg(final String identityId, final ClusterMsg.Message 
message) {
+    public boolean sendMsg(final String identityId, final ClusterMessage 
message) {
         Channel channel = this.getChannel(identityId);
         if (channel != null) {
             this.remotingServer.sendMsg(channel, message);
@@ -165,7 +165,7 @@ public class ManageServer implements CommandLineRunner {
         return false;
     }
 
-    public ClusterMsg.Message sendMsgSync(final String identityId, final 
ClusterMsg.Message message) {
+    public ClusterMessage sendMsgSync(final String identityId, final 
ClusterMessage message) {
         Channel channel = this.getChannel(identityId);
         if (channel != null) {
             return this.remotingServer.sendMsgSync(channel, message, 3000);
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicDataResponseProcessor.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicDataResponseProcessor.java
index 74d415c27c..12937d2574 100644
--- 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicDataResponseProcessor.java
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicDataResponseProcessor.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,7 +20,7 @@ package org.apache.hertzbeat.manager.scheduler.netty.process;
 import io.netty.channel.ChannelHandlerContext;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.queue.CommonDataQueue;
 import org.apache.hertzbeat.common.support.SpringContextHolder;
@@ -33,9 +33,9 @@ import 
org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
 @Slf4j
 public class CollectCyclicDataResponseProcessor implements 
NettyRemotingProcessor {
     @Override
-    public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
+    public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage 
message) {
         CommonDataQueue dataQueue = 
SpringContextHolder.getBean(CommonDataQueue.class);
-        List<CollectRep.MetricsData> metricsDataList = 
ArrowUtil.deserializeMetricsData(message.getMsg().toByteArray());
+        List<CollectRep.MetricsData> metricsDataList = 
ArrowUtil.deserializeMetricsData(message.getMsg());
         for (CollectRep.MetricsData metricsData : metricsDataList) {
             if (metricsData != null) {
                 dataQueue.sendMetricsData(metricsData);
@@ -43,4 +43,4 @@ public class CollectCyclicDataResponseProcessor implements 
NettyRemotingProcesso
         }
         return null;
     }
-}
+}
\ No newline at end of file
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicServiceDiscoveryDataResponseProcessor.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicServiceDiscoveryDataResponseProcessor.java
index d8e8a8c248..174d0e7adb 100644
--- 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicServiceDiscoveryDataResponseProcessor.java
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectCyclicServiceDiscoveryDataResponseProcessor.java
@@ -20,7 +20,7 @@ package org.apache.hertzbeat.manager.scheduler.netty.process;
 import io.netty.channel.ChannelHandlerContext;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.queue.CommonDataQueue;
 import org.apache.hertzbeat.common.support.SpringContextHolder;
@@ -33,9 +33,9 @@ import 
org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
 @Slf4j
 public class CollectCyclicServiceDiscoveryDataResponseProcessor implements 
NettyRemotingProcessor {
     @Override
-    public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
+    public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage 
message) {
         CommonDataQueue dataQueue = 
SpringContextHolder.getBean(CommonDataQueue.class);
-        List<CollectRep.MetricsData> metricsDataList = 
ArrowUtil.deserializeMetricsData(message.getMsg().toByteArray());
+        List<CollectRep.MetricsData> metricsDataList = 
ArrowUtil.deserializeMetricsData(message.getMsg());
         for (CollectRep.MetricsData metricsData : metricsDataList) {
             if (metricsData != null) {
                 dataQueue.sendServiceDiscoveryData(metricsData);
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectOneTimeDataResponseProcessor.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectOneTimeDataResponseProcessor.java
index b9e064d7b4..cabf016b39 100644
--- 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectOneTimeDataResponseProcessor.java
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectOneTimeDataResponseProcessor.java
@@ -20,7 +20,7 @@ package org.apache.hertzbeat.manager.scheduler.netty.process;
 import io.netty.channel.ChannelHandlerContext;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.util.ArrowUtil;
 import org.apache.hertzbeat.manager.scheduler.netty.ManageServer;
@@ -39,9 +39,9 @@ public class CollectOneTimeDataResponseProcessor implements 
NettyRemotingProcess
     }
 
     @Override
-    public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
+    public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage 
message) {
 
-        List<CollectRep.MetricsData> metricsDataList = 
ArrowUtil.deserializeMetricsData(message.getMsg().toByteArray());
+        List<CollectRep.MetricsData> metricsDataList = 
ArrowUtil.deserializeMetricsData(message.getMsg());
         
this.manageServer.getCollectorAndJobScheduler().collectSyncJobResponse(metricsDataList);
         return null;
     }
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOfflineProcessor.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOfflineProcessor.java
index 85c5b0d132..137af5f0b5 100644
--- 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOfflineProcessor.java
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOfflineProcessor.java
@@ -19,7 +19,7 @@ package org.apache.hertzbeat.manager.scheduler.netty.process;
 
 import io.netty.channel.ChannelHandlerContext;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.manager.scheduler.netty.ManageServer;
 import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
 
@@ -36,7 +36,7 @@ public class CollectorOfflineProcessor implements 
NettyRemotingProcessor {
     }
 
     @Override
-    public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
+    public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage 
message) {
         String collector = message.getIdentity();
         log.info("the collector {} actively requests to go offline.", 
collector);
         
this.manageServer.getCollectorAndJobScheduler().collectorGoOffline(collector);
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java
index 9a7e4f4e08..5486d4af3a 100644
--- 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java
@@ -17,14 +17,13 @@
 
 package org.apache.hertzbeat.manager.scheduler.netty.process;
 
-import com.google.protobuf.ByteString;
 import io.netty.channel.ChannelHandlerContext;
 import java.net.InetSocketAddress;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hertzbeat.common.entity.dto.CollectorInfo;
 import org.apache.hertzbeat.common.entity.dto.ServerInfo;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.util.AesUtil;
 import org.apache.hertzbeat.common.util.JsonUtil;
 import org.apache.hertzbeat.manager.scheduler.netty.ManageServer;
@@ -42,10 +41,10 @@ public class CollectorOnlineProcessor implements 
NettyRemotingProcessor {
     }
 
     @Override
-    public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
+    public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage 
message) {
         String collector = message.getIdentity();
         log.info("the collector {} actively requests to go online.", 
collector);
-        String msg = message.getMsg().toStringUtf8();
+        String msg = message.getMsgString();
         CollectorInfo collectorInfo = JsonUtil.fromJson(msg, 
CollectorInfo.class);
         if (collectorInfo != null && 
StringUtils.isBlank(collectorInfo.getIp())) {
             // fetch remote ip address
@@ -56,11 +55,11 @@ public class CollectorOnlineProcessor implements 
NettyRemotingProcessor {
         this.manageServer.addChannel(collector, ctx.channel());
         
this.manageServer.getCollectorAndJobScheduler().collectorGoOnline(collector, 
collectorInfo);
         ServerInfo serverInfo = 
ServerInfo.builder().aesSecret(AesUtil.getDefaultSecretKey()).build();
-        return ClusterMsg.Message.newBuilder()
-                .setIdentity(message.getIdentity())
-                .setDirection(ClusterMsg.Direction.RESPONSE)
-                .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(serverInfo)))
-                .setType(ClusterMsg.MessageType.GO_ONLINE)
+        return ClusterMessage.builder()
+                .identity(message.getIdentity())
+                .direction(ClusterMessage.Direction.RESPONSE)
+                .msg(JsonUtil.toJsonBytes(serverInfo))
+                .type(ClusterMessage.MessageType.GO_ONLINE)
                 .build();
     }
 }
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
index 8cf7050279..62e873d0cf 100644
--- 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java
@@ -19,7 +19,7 @@ package org.apache.hertzbeat.manager.scheduler.netty.process;
 
 import io.netty.channel.ChannelHandlerContext;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.manager.scheduler.netty.ManageServer;
 import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
 
@@ -36,7 +36,7 @@ public class HeartbeatProcessor implements 
NettyRemotingProcessor {
     }
 
     @Override
-    public ClusterMsg.Message handle(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
+    public ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage 
message) {
         String identity = message.getIdentity();
         boolean isChannelActive = this.manageServer.isChannelActive(identity);
         if (!isChannelActive) {
@@ -52,8 +52,10 @@ public class HeartbeatProcessor implements 
NettyRemotingProcessor {
         if (log.isDebugEnabled()) {
             log.debug("server receive collector {} heartbeat", 
message.getIdentity());
         }
-        return ClusterMsg.Message.newBuilder()
-                .setType(ClusterMsg.MessageType.HEARTBEAT)
+        return ClusterMessage.builder()
+                .identity(message.getIdentity())
+                .type(ClusterMessage.MessageType.HEARTBEAT)
+                .direction(ClusterMessage.Direction.RESPONSE)
                 .build();
     }
 }
diff --git 
a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java
 
b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java
index e068c7b7fb..e77535445a 100644
--- 
a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java
+++ 
b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java
@@ -22,7 +22,7 @@ import org.apache.hertzbeat.common.entity.dto.CollectorInfo;
 import org.apache.hertzbeat.common.entity.job.Job;
 import org.apache.hertzbeat.common.entity.manager.CollectorMonitorBind;
 import org.apache.hertzbeat.common.entity.manager.Monitor;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.util.JsonUtil;
 import org.apache.hertzbeat.manager.dao.CollectorDao;
@@ -137,11 +137,11 @@ public class CollectorJobSchedulerTest {
         collectorJobScheduler.collectorGoOnline(identity, collectorInfo);
 
         // Capture the parameters of sendMsg
-        ArgumentCaptor<ClusterMsg.Message> msgCaptor = 
ArgumentCaptor.forClass(ClusterMsg.Message.class);
+        ArgumentCaptor<ClusterMessage> msgCaptor = 
ArgumentCaptor.forClass(ClusterMessage.class);
         verify(manageServer, atLeastOnce()).sendMsg(eq("collector-1"), 
msgCaptor.capture());
-        ClusterMsg.Message message = msgCaptor.getValue();
+        ClusterMessage message = msgCaptor.getValue();
 
-        Job job = JsonUtil.fromJson(message.getMsg().toStringUtf8(), 
Job.class);
+        Job job = JsonUtil.fromJson(message.getMsgString(), Job.class);
         assertNotNull(job);
         assertNotNull(job.getMetadata());
         assertEquals("test-monitor", 
job.getMetadata().get(CommonConstants.LABEL_INSTANCE_NAME));
diff --git 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingClient.java
 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingClient.java
index dc5b4a557c..af363641af 100644
--- 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingClient.java
+++ 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingClient.java
@@ -17,7 +17,7 @@
 
 package org.apache.hertzbeat.remoting;
 
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
 
 /**
@@ -30,13 +30,13 @@ public interface RemotingClient extends RemotingService {
      * @param messageType type
      * @param processor remoting processor
      */
-    void registerProcessor(ClusterMsg.MessageType messageType, 
NettyRemotingProcessor processor);
+    void registerProcessor(ClusterMessage.MessageType messageType, 
NettyRemotingProcessor processor);
 
     /**
      * send message to server
      * @param request request message
      */
-    void sendMsg(ClusterMsg.Message request);
+    void sendMsg(ClusterMessage request);
 
     /**
      * send message to server and sync waiting receive server message
@@ -44,5 +44,5 @@ public interface RemotingClient extends RemotingService {
      * @param timeoutMillis timeout millis
      * @return response message
      */
-    ClusterMsg.Message sendMsgSync(ClusterMsg.Message request,  int 
timeoutMillis);
+    ClusterMessage sendMsgSync(ClusterMessage request,  int timeoutMillis);
 }
diff --git 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingServer.java
 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingServer.java
index 35fdd1d48f..b9af22cd84 100644
--- 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingServer.java
+++ 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/RemotingServer.java
@@ -19,7 +19,7 @@ package org.apache.hertzbeat.remoting;
 
 import io.netty.channel.Channel;
 import java.util.List;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.remoting.netty.NettyHook;
 import org.apache.hertzbeat.remoting.netty.NettyRemotingProcessor;
 
@@ -33,14 +33,14 @@ public interface RemotingServer extends RemotingService {
      * @param messageType type
      * @param processor remoting processor
      */
-    void registerProcessor(ClusterMsg.MessageType messageType, 
NettyRemotingProcessor processor);
+    void registerProcessor(ClusterMessage.MessageType messageType, 
NettyRemotingProcessor processor);
 
     /**
      * send message to client
      * @param channel client channel
      * @param request request message
      */
-    void sendMsg(Channel channel, ClusterMsg.Message request);
+    void sendMsg(Channel channel, ClusterMessage request);
 
     /**
      * send message to client and receive client message
@@ -49,7 +49,7 @@ public interface RemotingServer extends RemotingService {
      * @param timeoutMillis timeout millis
      * @return response message
      */
-    ClusterMsg.Message sendMsgSync(Channel channel, ClusterMsg.Message 
request, int timeoutMillis);
+    ClusterMessage sendMsgSync(Channel channel, ClusterMessage request, int 
timeoutMillis);
 
     /**
      * register hook.
diff --git 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyHook.java
 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyHook.java
index 27d149b5cb..0b75b4a8f6 100644
--- 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyHook.java
+++ 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyHook.java
@@ -18,13 +18,13 @@
 package org.apache.hertzbeat.remoting.netty;
 
 import io.netty.channel.ChannelHandlerContext;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 
 /**
  * hook interface, handle something before request processor
  */
 public interface NettyHook {
 
-    void doBeforeRequest(ChannelHandlerContext ctx, ClusterMsg.Message 
message);
+    void doBeforeRequest(ChannelHandlerContext ctx, ClusterMessage message);
 
 }
diff --git 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java
 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java
index 3d9390f467..a721f7da3f 100644
--- 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java
+++ 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingAbstract.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,19 +27,19 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.util.NetworkUtil;
 import org.apache.hertzbeat.remoting.RemotingService;
 import org.apache.hertzbeat.remoting.event.NettyEventListener;
 
 /**
- * Derived from Apache Rocketmq 
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract 
+ * Derived from Apache Rocketmq 
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract
  * netty remote abstract
  * @see <a 
href="https://github.com/apache/rocketmq/blob/develop/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java";>NettyRemotingAbstract</a>
  */
 @Slf4j
 public abstract class NettyRemotingAbstract implements RemotingService {
-    protected ConcurrentHashMap<ClusterMsg.MessageType, 
NettyRemotingProcessor> processorTable = new ConcurrentHashMap<>();
+    protected ConcurrentHashMap<ClusterMessage.MessageType, 
NettyRemotingProcessor> processorTable = new ConcurrentHashMap<>();
 
     protected ConcurrentHashMap<String, ResponseFuture> responseTable = new 
ConcurrentHashMap<>();
 
@@ -51,33 +51,32 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
         this.nettyEventListener = nettyEventListener;
     }
 
-    public void registerProcessor(final ClusterMsg.MessageType messageType, 
final NettyRemotingProcessor processor) {
+    public void registerProcessor(final ClusterMessage.MessageType 
messageType, final NettyRemotingProcessor processor) {
         this.processorTable.put(messageType, processor);
     }
 
-    protected void processReceiveMsg(ChannelHandlerContext ctx, 
ClusterMsg.Message message) {
-        if (ClusterMsg.Direction.REQUEST.equals(message.getDirection())) {
+    protected void processReceiveMsg(ChannelHandlerContext ctx, ClusterMessage 
message) {
+        if (ClusterMessage.Direction.REQUEST.equals(message.getDirection())) {
             this.processRequestMsg(ctx, message);
         } else {
             this.processResponseMsg(ctx, message);
         }
     }
 
-    protected void processRequestMsg(ChannelHandlerContext ctx, 
ClusterMsg.Message request) {
+    protected void processRequestMsg(ChannelHandlerContext ctx, ClusterMessage 
request) {
         this.doBeforeRequest(ctx, request);
-
         NettyRemotingProcessor processor = 
this.processorTable.get(request.getType());
         if (processor == null) {
             log.info("request type {} not supported", request.getType());
             return;
         }
-        ClusterMsg.Message response = processor.handle(ctx, request);
+        ClusterMessage response = processor.handle(ctx, request);
         if (response != null) {
             ctx.writeAndFlush(response);
         }
     }
 
-    private void doBeforeRequest(ChannelHandlerContext ctx, ClusterMsg.Message 
request) {
+    private void doBeforeRequest(ChannelHandlerContext ctx, ClusterMessage 
request) {
         if (CollectionUtils.isEmpty(this.nettyHookList)) {
             return;
         }
@@ -86,7 +85,7 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
         }
     }
 
-    protected void processResponseMsg(ChannelHandlerContext ctx, 
ClusterMsg.Message response) {
+    protected void processResponseMsg(ChannelHandlerContext ctx, 
ClusterMessage response) {
         // for sync response
         if (this.responseTable.containsKey(response.getIdentity())) {
             ResponseFuture responseFuture = 
this.responseTable.get(response.getIdentity());
@@ -95,15 +94,15 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
             // async response
             NettyRemotingProcessor processor = 
this.processorTable.get(response.getType());
             if (processor != null) {
-                ClusterMsg.Message repMessage = processor.handle(ctx, 
response);
+                ClusterMessage repMessage = processor.handle(ctx, response);
                 if (repMessage != null) {
                     ctx.writeAndFlush(repMessage);
                 }
-            }   
+            }
         }
     }
 
-    protected void sendMsgImpl(final Channel channel, final ClusterMsg.Message 
request) {
+    protected void sendMsgImpl(final Channel channel, final ClusterMessage 
request) {
         channel.writeAndFlush(request).addListener(future -> {
             if (!future.isSuccess()) {
                 log.warn("send request message failed. address: {}, ", 
channel.remoteAddress(), future.cause());
@@ -111,7 +110,7 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
         });
     }
 
-    protected ClusterMsg.Message sendMsgSyncImpl(final Channel channel, final 
ClusterMsg.Message request, final int timeoutMillis) {
+    protected ClusterMessage sendMsgSyncImpl(final Channel channel, final 
ClusterMessage request, final int timeoutMillis) {
         final String identity = request.getIdentity();
 
         try {
@@ -123,7 +122,7 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
                     log.warn("send request message failed. request: {}, 
address: {}, ", request, channel.remoteAddress(), future.cause());
                 }
             });
-            ClusterMsg.Message response = 
responseFuture.waitResponse(timeoutMillis);
+            ClusterMessage response = 
responseFuture.waitResponse(timeoutMillis);
             if (response == null) {
                 log.warn("get response message failed, message is null");
             }
@@ -155,4 +154,4 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
                 && Epoll.isAvailable();
     }
 
-}
+}
\ No newline at end of file
diff --git 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
index a01c7fab80..7c6233f454 100644
--- 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
+++ 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -29,21 +29,18 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.compression.ZlibCodecFactory;
-import io.netty.handler.codec.compression.ZlibWrapper;
-import io.netty.handler.codec.protobuf.ProtobufDecoder;
-import io.netty.handler.codec.protobuf.ProtobufEncoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
 import java.util.concurrent.ThreadFactory;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.support.CommonThreadPool;
 import org.apache.hertzbeat.remoting.RemotingClient;
 import org.apache.hertzbeat.remoting.event.NettyEventListener;
+import org.apache.hertzbeat.remoting.netty.codec.ForyCodec;
 
 /**
- * Derived from Apache Rocketmq 
org.apache.rocketmq.remoting.netty.NettyRemotingClient 
+ * Derived from Apache Rocketmq 
org.apache.rocketmq.remoting.netty.NettyRemotingClient
  * netty client
  * @see <a 
href="https://github.com/apache/rocketmq/blob/develop/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java";>NettyRemotingClient</a>
  */
@@ -51,7 +48,7 @@ import org.apache.hertzbeat.remoting.event.NettyEventListener;
 public class NettyRemotingClient extends NettyRemotingAbstract implements 
RemotingClient {
 
     private static final int DEFAULT_WORKER_THREAD_NUM = Math.min(4, 
Runtime.getRuntime().availableProcessors());
-    
+
     private final NettyClientConfig nettyClientConfig;
 
     private final CommonThreadPool threadPool;
@@ -81,7 +78,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                     .setDaemon(true)
                     .setNameFormat("netty-client-worker-%d")
                     .build();
-            String envThreadNum = 
System.getProperty("hertzbeat.client.worker.thread.num"); 
+            String envThreadNum = 
System.getProperty("hertzbeat.client.worker.thread.num");
             int workerThreadNum = envThreadNum != null ? 
Integer.parseInt(envThreadNum) : DEFAULT_WORKER_THREAD_NUM;
             this.workerGroup = new NioEventLoopGroup(workerThreadNum, 
threadFactory);
             this.bootstrap.group(workerGroup)
@@ -96,7 +93,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
 
             this.channel = null;
             boolean first = true;
-            while (!Thread.currentThread().isInterrupted() 
+            while (!Thread.currentThread().isInterrupted()
                     && (first || this.channel == null || 
!this.channel.isActive())) {
                 first = false;
                 try {
@@ -121,14 +118,11 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
 
     private void initChannel(final SocketChannel channel) {
         ChannelPipeline pipeline = channel.pipeline();
-        // zip
-        pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
-        pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
-        // protocol buf encode decode
-        pipeline.addLast(new ProtobufVarint32FrameDecoder());
-        pipeline.addLast(new 
ProtobufDecoder(ClusterMsg.Message.getDefaultInstance()));
-        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
-        pipeline.addLast(new ProtobufEncoder());
+        // max frame length 10MB
+        pipeline.addLast(new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 
4, 0, 4));
+        pipeline.addLast(new LengthFieldPrepender(4));
+        // fory codec
+        pipeline.addLast(new ForyCodec());
         pipeline.addLast(new NettyClientHandler());
 
     }
@@ -155,16 +149,16 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
     }
 
     @Override
-    public void sendMsg(final ClusterMsg.Message request) {
+    public void sendMsg(final ClusterMessage request) {
         this.sendMsgImpl(this.channel, request);
     }
 
     @Override
-    public ClusterMsg.Message sendMsgSync(ClusterMsg.Message request, int 
timeoutMillis) {
+    public ClusterMessage sendMsgSync(ClusterMessage request, int 
timeoutMillis) {
         return this.sendMsgSyncImpl(this.channel, request, timeoutMillis);
     }
 
-    class NettyClientHandler extends 
SimpleChannelInboundHandler<ClusterMsg.Message> {
+    class NettyClientHandler extends 
SimpleChannelInboundHandler<ClusterMessage> {
 
         @Override
         public void channelActive(ChannelHandlerContext ctx) throws Exception {
@@ -172,7 +166,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         }
 
         @Override
-        protected void channelRead0(ChannelHandlerContext ctx, 
ClusterMsg.Message msg) throws Exception {
+        protected void channelRead0(ChannelHandlerContext ctx, ClusterMessage 
msg) throws Exception {
             NettyRemotingClient.this.processReceiveMsg(ctx, msg);
         }
 
@@ -181,4 +175,4 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
             NettyRemotingClient.this.channelIdle(ctx, evt);
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingProcessor.java
 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingProcessor.java
index 27c68dc1ff..12e7739db6 100644
--- 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingProcessor.java
+++ 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingProcessor.java
@@ -18,7 +18,7 @@
 package org.apache.hertzbeat.remoting.netty;
 
 import io.netty.channel.ChannelHandlerContext;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 
 /**
  * Derived from Apache Rocketmq 
org.apache.rocketmq.remoting.netty.NettyRequestProcessor 
@@ -27,6 +27,6 @@ import org.apache.hertzbeat.common.entity.message.ClusterMsg;
  */
 public interface NettyRemotingProcessor {
 
-    ClusterMsg.Message handle(ChannelHandlerContext ctx, ClusterMsg.Message 
message);
+    ClusterMessage handle(ChannelHandlerContext ctx, ClusterMessage message);
 
 }
diff --git 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
index 48062fadb9..63eb567e8f 100644
--- 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
+++ 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -32,25 +32,22 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.compression.ZlibCodecFactory;
-import io.netty.handler.codec.compression.ZlibWrapper;
-import io.netty.handler.codec.protobuf.ProtobufDecoder;
-import io.netty.handler.codec.protobuf.ProtobufEncoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import io.netty.handler.timeout.IdleStateHandler;
 import java.util.List;
 import java.util.concurrent.ThreadFactory;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.support.CommonThreadPool;
 import org.apache.hertzbeat.remoting.RemotingServer;
 import org.apache.hertzbeat.remoting.event.NettyEventListener;
+import org.apache.hertzbeat.remoting.netty.codec.ForyCodec;
 
 /**
- * Derived from Apache Rocketmq 
org.apache.rocketmq.remoting.netty.NettyRemotingServer 
+ * Derived from Apache Rocketmq 
org.apache.rocketmq.remoting.netty.NettyRemotingServer
  * netty server
  * @see <a 
href="https://github.com/apache/rocketmq/blob/develop/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java";>NettyRemotingServer</a>
  */
@@ -137,14 +134,11 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
     private void initChannel(final SocketChannel channel) {
         ChannelPipeline pipeline = channel.pipeline();
-        // zip
-        pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
-        pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
-        // protocol buf encode decode
-        pipeline.addLast(new ProtobufVarint32FrameDecoder());
-        pipeline.addLast(new 
ProtobufDecoder(ClusterMsg.Message.getDefaultInstance()));
-        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
-        pipeline.addLast(new ProtobufEncoder());
+        // fory codec
+        // max frame length 10MB
+        pipeline.addLast(new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 
4, 0, 4));
+        pipeline.addLast(new LengthFieldPrepender(4));
+        pipeline.addLast(new ForyCodec());
         // idle state
         pipeline.addLast(new IdleStateHandler(0, 0, 
nettyServerConfig.getIdleStateEventTriggerTime()));
         pipeline.addLast(new NettyServerHandler());
@@ -165,12 +159,12 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
     }
 
     @Override
-    public void sendMsg(final Channel channel, final ClusterMsg.Message 
request) {
+    public void sendMsg(final Channel channel, final ClusterMessage request) {
         this.sendMsgImpl(channel, request);
     }
 
     @Override
-    public ClusterMsg.Message sendMsgSync(final Channel channel, final 
ClusterMsg.Message request, final int timeoutMillis) {
+    public ClusterMessage sendMsgSync(final Channel channel, final 
ClusterMessage request, final int timeoutMillis) {
         return this.sendMsgSyncImpl(channel, request, timeoutMillis);
     }
 
@@ -183,7 +177,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
      * netty server handler
      */
     @ChannelHandler.Sharable
-    public class NettyServerHandler extends 
SimpleChannelInboundHandler<ClusterMsg.Message> {
+    public class NettyServerHandler extends 
SimpleChannelInboundHandler<ClusterMessage> {
 
         @Override
         public void channelActive(ChannelHandlerContext ctx) throws Exception {
@@ -191,7 +185,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         }
 
         @Override
-        protected void channelRead0(ChannelHandlerContext ctx, 
ClusterMsg.Message msg) throws Exception {
+        protected void channelRead0(ChannelHandlerContext ctx, ClusterMessage 
msg) {
             NettyRemotingServer.this.processReceiveMsg(ctx, msg);
         }
 
@@ -200,4 +194,4 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
             NettyRemotingServer.this.channelIdle(ctx, evt);
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/ResponseFuture.java
 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/ResponseFuture.java
index 3a9a38295e..34ecbf2a1a 100644
--- 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/ResponseFuture.java
+++ 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/ResponseFuture.java
@@ -19,7 +19,7 @@ package org.apache.hertzbeat.remoting.netty;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 
 /**
  * netty response future
@@ -28,14 +28,14 @@ public class ResponseFuture {
 
     private final CountDownLatch countDownLatch = new CountDownLatch(1);
 
-    private ClusterMsg.Message response;
+    private ClusterMessage response;
 
-    public ClusterMsg.Message waitResponse(final long timeoutMillis) throws 
InterruptedException {
+    public ClusterMessage waitResponse(final long timeoutMillis) throws 
InterruptedException {
         this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
         return this.response;
     }
 
-    public void putResponse(final ClusterMsg.Message response) {
+    public void putResponse(final ClusterMessage response) {
         this.response = response;
         this.countDownLatch.countDown();
     }
diff --git 
a/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/codec/ForyCodec.java
 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/codec/ForyCodec.java
new file mode 100644
index 0000000000..ad8cde8554
--- /dev/null
+++ 
b/hertzbeat-remoting/src/main/java/org/apache/hertzbeat/remoting/netty/codec/ForyCodec.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.remoting.netty.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageCodec;
+import org.apache.fory.Fory;
+import org.apache.fory.ThreadSafeFory;
+import org.apache.fory.config.Language;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
+
+import java.util.List;
+
+/**
+ * Netty codec for ClusterMessage using Apache fory
+ */
+public class ForyCodec extends ByteToMessageCodec<ClusterMessage> {
+
+    private static final ThreadSafeFory fory;
+
+    static {
+        // 1. Initialize Fory in XLANG mode
+        fory = Fory.builder()
+                .withLanguage(Language.XLANG)
+                .requireClassRegistration(false)
+                .buildThreadSafeFory();
+
+        // 2. Register classes with specific names for Cross-Language 
compatibility (Java <-> Go)
+        // These names (e.g., "ClusterMessage") must match the registration 
name in the Go client.
+        fory.register(ClusterMessage.class, "ClusterMessage");
+        fory.register(ClusterMessage.MessageType.class, "MessageType");
+        fory.register(ClusterMessage.Direction.class, "Direction");
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, ClusterMessage msg, 
ByteBuf out) {
+        byte[] bytes = fory.serialize(msg);
+        out.writeBytes(bytes);
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> 
out) {
+        byte[] bytes = new byte[in.readableBytes()];
+        in.readBytes(bytes);
+        Object obj = fory.deserialize(bytes);
+        out.add(obj);
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-remoting/src/test/java/org/apache/hertzbeat/remoting/RemotingServiceTest.java
 
b/hertzbeat-remoting/src/test/java/org/apache/hertzbeat/remoting/RemotingServiceTest.java
index 4a852d99d8..cebcf37380 100644
--- 
a/hertzbeat-remoting/src/test/java/org/apache/hertzbeat/remoting/RemotingServiceTest.java
+++ 
b/hertzbeat-remoting/src/test/java/org/apache/hertzbeat/remoting/RemotingServiceTest.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,8 +17,8 @@
 
 package org.apache.hertzbeat.remoting;
 
-import com.google.protobuf.ByteString;
-import org.apache.hertzbeat.common.entity.message.ClusterMsg;
+import java.nio.charset.StandardCharsets;
+import org.apache.hertzbeat.common.entity.message.ClusterMessage;
 import org.apache.hertzbeat.common.support.CommonThreadPool;
 import org.apache.hertzbeat.remoting.netty.NettyClientConfig;
 import org.apache.hertzbeat.remoting.netty.NettyRemotingClient;
@@ -102,15 +102,15 @@ public class RemotingServiceTest {
     public void testSendMsg() {
         final String msg = "hello world";
 
-        
this.remotingServer.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, (ctx, 
message) -> {
-            Assertions.assertEquals(msg, message.getMsg().toStringUtf8());
+        
this.remotingServer.registerProcessor(ClusterMessage.MessageType.HEARTBEAT, 
(ctx, message) -> {
+            Assertions.assertEquals(msg, message.getMsgString());
             return null;
         });
 
-        ClusterMsg.Message request = ClusterMsg.Message.newBuilder()
-                .setDirection(ClusterMsg.Direction.REQUEST)
-                .setType(ClusterMsg.MessageType.HEARTBEAT)
-                .setMsg(ByteString.copyFromUtf8(msg))
+        ClusterMessage request = ClusterMessage.builder()
+                .direction(ClusterMessage.Direction.REQUEST)
+                .type(ClusterMessage.MessageType.HEARTBEAT)
+                .msg(msg.getBytes(StandardCharsets.UTF_8))
                 .build();
         this.remotingClient.sendMsg(request);
     }
@@ -120,40 +120,42 @@ public class RemotingServiceTest {
         final String requestMsg = "request";
         final String responseMsg = "response";
 
-        
this.remotingServer.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, (ctx, 
message) -> {
-            Assertions.assertEquals(requestMsg, 
message.getMsg().toStringUtf8());
-            return ClusterMsg.Message.newBuilder()
-                    .setDirection(ClusterMsg.Direction.RESPONSE)
-                    .setMsg(ByteString.copyFromUtf8(responseMsg))
+        
this.remotingServer.registerProcessor(ClusterMessage.MessageType.HEARTBEAT, 
(ctx, message) -> {
+            Assertions.assertEquals(requestMsg, message.getMsgString());
+            return ClusterMessage.builder()
+                    .direction(ClusterMessage.Direction.RESPONSE)
+                    .type(ClusterMessage.MessageType.HEARTBEAT)
+                    .msg(responseMsg.getBytes(StandardCharsets.UTF_8))
                     .build();
         });
 
-        ClusterMsg.Message request = ClusterMsg.Message.newBuilder()
-                .setDirection(ClusterMsg.Direction.REQUEST)
-                .setType(ClusterMsg.MessageType.HEARTBEAT)
-                .setMsg(ByteString.copyFromUtf8(requestMsg))
+        ClusterMessage request = ClusterMessage.builder()
+                .direction(ClusterMessage.Direction.REQUEST)
+                .type(ClusterMessage.MessageType.HEARTBEAT)
+                .msg(requestMsg.getBytes(StandardCharsets.UTF_8))
                 .build();
-        ClusterMsg.Message response = this.remotingClient.sendMsgSync(request, 
3000);
-        Assertions.assertEquals(responseMsg, response.getMsg().toStringUtf8());
+        ClusterMessage response = this.remotingClient.sendMsgSync(request, 
3000);
+        Assertions.assertEquals(responseMsg, response.getMsgString());
     }
 
     @Test
     public void testNettyHook() {
         this.remotingServer.registerHook(Lists.newArrayList(
-                (ctx, message) -> Assertions.assertEquals("hello world", 
message.getMsg().toStringUtf8())
+                (ctx, message) -> Assertions.assertEquals("hello world", 
message.getMsgString())
         ));
 
-        
this.remotingServer.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, (ctx, 
message) ->
-                ClusterMsg.Message.newBuilder()
-                        .setDirection(ClusterMsg.Direction.RESPONSE)
+        
this.remotingServer.registerProcessor(ClusterMessage.MessageType.HEARTBEAT, 
(ctx, message) ->
+                ClusterMessage.builder()
+                        .direction(ClusterMessage.Direction.RESPONSE)
+                        .type(ClusterMessage.MessageType.HEARTBEAT)
                         .build());
 
-        ClusterMsg.Message request = ClusterMsg.Message.newBuilder()
-                .setDirection(ClusterMsg.Direction.REQUEST)
-                .setType(ClusterMsg.MessageType.HEARTBEAT)
-                .setMsg(ByteString.copyFromUtf8("hello world"))
+        ClusterMessage request = ClusterMessage.builder()
+                .direction(ClusterMessage.Direction.REQUEST)
+                .type(ClusterMessage.MessageType.HEARTBEAT)
+                .msg("hello world".getBytes(StandardCharsets.UTF_8))
                 .build();
         this.remotingClient.sendMsg(request);
     }
 
-}
+}
\ No newline at end of file
diff --git a/material/licenses/backend/LICENSE 
b/material/licenses/backend/LICENSE
index 826b50efca..1234a692ad 100644
--- a/material/licenses/backend/LICENSE
+++ b/material/licenses/backend/LICENSE
@@ -437,6 +437,7 @@ The text of each license is the standard Apache 2.0 license.
     https://mvnrepository.com/artifact/com.vesoft/client/3.6.0
     https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper/3.9.3 
Apache-2.0
     
https://mvnrepository.com/artifact/io.opentelemetry.proto/opentelemetry-proto/1.7.0-alpha
 Apache-2.0
+    https://mvnrepository.com/artifact/org.apache.fory/fory-core/0.13.1 
Apache-2.0
 
 
 ========================================================================
diff --git a/pom.xml b/pom.xml
index 4196824b5b..75ed3da453 100644
--- a/pom.xml
+++ b/pom.xml
@@ -177,6 +177,7 @@
         <opentelemetry-starter.version>2.22.0</opentelemetry-starter.version>
         
<opentelemetry-logback.version>2.22.0-alpha</opentelemetry-logback.version>
         <opentelemetry-proto.version>1.9.0-alpha</opentelemetry-proto.version>
+        <fory.version>0.13.1</fory.version>
     </properties>
 
     <dependencyManagement>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to