This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new eb19e98a60 optimize : Eliminate RpcMessage and Encoder/Decoder
dependencies (#6209)
eb19e98a60 is described below
commit eb19e98a60afd611394c648614a5683b948cd7ec
Author: justabug <[email protected]>
AuthorDate: Mon Jun 24 10:03:27 2024 +0800
optimize : Eliminate RpcMessage and Encoder/Decoder dependencies (#6209)
---
changes/en-us/2.x.md | 1 +
changes/zh-cn/2.x.md | 2 +-
.../rpc/netty/AbstractNettyRemotingClient.java | 9 +-
.../rpc/netty/AbstractNettyRemotingServer.java | 9 +-
.../seata/core/rpc/netty/ProtocolRpcMessage.java | 37 ++++
.../core/rpc/netty/v0/ProtocolRpcMessageV0.java | 203 +++++++++++++++++++++
.../core/rpc/netty/v1/ProtocolRpcMessageV1.java | 195 ++++++++++++++++++++
.../seata/core/rpc/netty/v1/ProtocolV1Decoder.java | 3 +-
.../seata/core/rpc/netty/v1/ProtocolV1Encoder.java | 4 +-
.../core/rpc/netty/v1/ClientChannelHandler.java | 8 +-
.../rpc/netty/v1/ProtocolV1SerializerTest.java | 4 +-
.../core/rpc/netty/v1/ServerChannelHandler.java | 10 +-
12 files changed, 467 insertions(+), 18 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index f4de08bc26..7941cbbf60 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -13,6 +13,7 @@ Add changes here for all PR submitted to the 2.x branch.
### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the
task thread pool for committing and rollbacking statuses
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize :
load SeataSerializer by version
+- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] Eliminate
RpcMessage and Encoder/Decoder dependencies
### refactor:
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] optimize:
send async response
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index f9a0eab0c7..25ff7a0587 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -12,7 +12,7 @@
### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing
和 rollbacking 状态的任务线程池
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化
-
+- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] 解开 RpcMessage
和 Encoder/Decoder 的互相依赖
### refactor:
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] 优化: 发送异步响应
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
index 2b7b35aca0..2901eb8d3f 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
@@ -409,10 +409,13 @@ public abstract class AbstractNettyRemotingClient extends
AbstractNettyRemoting
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg)
throws Exception {
- if (!(msg instanceof RpcMessage)) {
- return;
+ RpcMessage rpcMessage = null;
+ if (msg instanceof ProtocolRpcMessage) {
+ rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
+ processMessage(ctx, rpcMessage);
+ } else {
+ LOGGER.error("rpcMessage type error");
}
- processMessage(ctx, (RpcMessage) msg);
}
@Override
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
index 09696286a6..72324c2689 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
@@ -163,10 +163,13 @@ public abstract class AbstractNettyRemotingServer extends
AbstractNettyRemoting
*/
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg)
throws Exception {
- if (!(msg instanceof RpcMessage)) {
- return;
+ RpcMessage rpcMessage = null;
+ if (msg instanceof ProtocolRpcMessage) {
+ rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
+ processMessage(ctx, rpcMessage);
+ } else {
+ LOGGER.error("rpcMessage type error");
}
- processMessage(ctx, (RpcMessage) msg);
}
@Override
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java
new file mode 100644
index 0000000000..ba89c508a8
--- /dev/null
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import org.apache.seata.core.protocol.RpcMessage;
+
+/**
+ * The protocol RPC message.
+ */
+public interface ProtocolRpcMessage {
+
+ /**
+ * The protocol message to rpc message.
+ * @return
+ */
+ RpcMessage protocolMsg2RpcMsg();
+
+ /**
+ * The rpc message to protocol message.
+ * @param rpcMessage
+ */
+ void rpcMsg2ProtocolMsg(RpcMessage rpcMessage);
+}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java
new file mode 100644
index 0000000000..cea2d7e6f7
--- /dev/null
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty.v0;
+
+import org.apache.seata.core.compressor.CompressorType;
+import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
+import org.apache.seata.core.serializer.SerializerType;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * the protocol v0 rpc message
+ **/
+public class ProtocolRpcMessageV0 implements ProtocolRpcMessage {
+
+ private static AtomicLong NEXT_ID = new AtomicLong(0);
+
+ /**
+ * Gets next message id.
+ *
+ * @return the next message id
+ */
+ public static long getNextMessageId() {
+ return NEXT_ID.incrementAndGet();
+ }
+
+ private long id;
+ private boolean isAsync;
+ private boolean isRequest;
+ private boolean isHeartbeat;
+ private Object body;
+ private byte messageType;
+ private boolean isSeataCodec;
+
+ /**
+ * Gets id.
+ *
+ * @return the id
+ */
+ public long getId() {
+ return id;
+ }
+
+ /**
+ * Sets id.
+ *
+ * @param id the id
+ */
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ /**
+ * Is async boolean.
+ *
+ * @return the boolean
+ */
+ public boolean isAsync() {
+ return isAsync;
+ }
+
+ /**
+ * Sets async.
+ *
+ * @param async the async
+ */
+ public void setAsync(boolean async) {
+ isAsync = async;
+ }
+
+ /**
+ * Is request boolean.
+ *
+ * @return the boolean
+ */
+ public boolean isRequest() {
+ return isRequest;
+ }
+
+ /**
+ * Sets request.
+ *
+ * @param request the request
+ */
+ public void setRequest(boolean request) {
+ isRequest = request;
+ }
+
+ /**
+ * Is heartbeat boolean.
+ *
+ * @return the boolean
+ */
+ public boolean isHeartbeat() {
+ return isHeartbeat;
+ }
+
+ /**
+ * Sets heartbeat.
+ *
+ * @param heartbeat the heartbeat
+ */
+ public void setHeartbeat(boolean heartbeat) {
+ isHeartbeat = heartbeat;
+ }
+
+ /**
+ * Gets body.
+ *
+ * @return the body
+ */
+ public Object getBody() {
+ return body;
+ }
+
+ /**
+ * Sets body.
+ *
+ * @param body the body
+ */
+ public void setBody(Object body) {
+ this.body = body;
+ }
+
+ public boolean isSeataCodec() {
+ return isSeataCodec;
+ }
+
+ public void setSeataCodec(boolean seataCodec) {
+ isSeataCodec = seataCodec;
+ }
+
+ public byte getMessageType() {
+ return messageType;
+ }
+
+ public void setMessageType(byte messageType) {
+ this.messageType = messageType;
+ }
+
+ @Override
+ public RpcMessage protocolMsg2RpcMsg() {
+ RpcMessage rpcMessage = new RpcMessage();
+ rpcMessage.setMessageType(this.messageType);
+ rpcMessage.setCompressor(CompressorType.NONE.getCode());
+
+ byte codecType = this.isSeataCodec ? SerializerType.SEATA.getCode() :
SerializerType.HESSIAN.getCode();
+ rpcMessage.setCodec(codecType);
+
+ if (this.isHeartbeat) {
+ if (this.isRequest) {
+
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST);
+ } else {
+
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE);
+ }
+ } else {
+ if (this.isRequest) {
+
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
+ } else {
+ rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESPONSE);
+ }
+ }
+ rpcMessage.setBody(this.body);
+ rpcMessage.setId((int) this.id);
+ return rpcMessage;
+ }
+
+ @Override
+ public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) {
+ this.body = rpcMessage.getBody();
+ this.id = rpcMessage.getId();
+ this.isRequest = isRequest(rpcMessage.getMessageType());
+ this.isHeartbeat = isHeartbeat(rpcMessage.getMessageType());
+ this.isSeataCodec = rpcMessage.getCodec() ==
SerializerType.SEATA.getCode();
+ this.messageType = rpcMessage.getMessageType();
+ }
+
+ private boolean isHeartbeat(byte msgType) {
+ return msgType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
+ || msgType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE;
+ }
+
+ private boolean isRequest(byte msgType) {
+ return msgType == ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY
+ || msgType == ProtocolConstants.MSGTYPE_RESQUEST_SYNC;
+ }
+}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java
new file mode 100644
index 0000000000..10668cbdc6
--- /dev/null
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty.v1;
+
+import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * protocol v1 rpc message
+ **/
+public class ProtocolRpcMessageV1 implements ProtocolRpcMessage {
+ private int id;
+ private byte messageType;
+ private byte codec;
+ private byte compressor;
+ private Map<String, String> headMap = new HashMap<>();
+ private Object body;
+
+ /**
+ * Gets id.
+ *
+ * @return the id
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Sets id.
+ *
+ * @param id the id
+ */
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ /**
+ * Gets body.
+ *
+ * @return the body
+ */
+ public Object getBody() {
+ return body;
+ }
+
+ /**
+ * Sets body.
+ *
+ * @param body the body
+ */
+ public void setBody(Object body) {
+ this.body = body;
+ }
+
+ /**
+ * Gets codec.
+ *
+ * @return the codec
+ */
+ public byte getCodec() {
+ return codec;
+ }
+
+ /**
+ * Sets codec.
+ *
+ * @param codec the codec
+ * @return the codec
+ */
+ public void setCodec(byte codec) {
+ this.codec = codec;
+ }
+
+ /**
+ * Gets compressor.
+ *
+ * @return the compressor
+ */
+ public byte getCompressor() {
+ return compressor;
+ }
+
+ /**
+ * Sets compressor.
+ *
+ * @param compressor the compressor
+ * @return the compressor
+ */
+ public void setCompressor(byte compressor) {
+ this.compressor = compressor;
+ }
+
+ /**
+ * Gets head map.
+ *
+ * @return the head map
+ */
+ public Map<String, String> getHeadMap() {
+ return headMap;
+ }
+
+ /**
+ * Sets head map.
+ *
+ * @param headMap the head map
+ * @return the head map
+ */
+ public void setHeadMap(Map<String, String> headMap) {
+ this.headMap = headMap;
+ }
+
+ /**
+ * Gets head.
+ *
+ * @param headKey the head key
+ * @return the head
+ */
+ public String getHead(String headKey) {
+ return headMap.get(headKey);
+ }
+
+ /**
+ * Put head.
+ *
+ * @param headKey the head key
+ * @param headValue the head value
+ */
+ public void putHead(String headKey, String headValue) {
+ headMap.put(headKey, headValue);
+ }
+
+ /**
+ * Gets message type.
+ *
+ * @return the message type
+ */
+ public byte getMessageType() {
+ return messageType;
+ }
+
+ /**
+ * Sets message type.
+ *
+ * @param messageType the message type
+ */
+ public void setMessageType(byte messageType) {
+ this.messageType = messageType;
+ }
+
+ @Override
+ public String toString() {
+ return StringUtils.toString(this);
+ }
+
+ @Override
+ public RpcMessage protocolMsg2RpcMsg() {
+ RpcMessage rpcMessage = new RpcMessage();
+ rpcMessage.setId(this.id);
+ rpcMessage.setMessageType(this.messageType);
+ rpcMessage.setCodec(this.codec);
+ rpcMessage.setCompressor(this.compressor);
+ rpcMessage.setHeadMap(this.headMap);
+ rpcMessage.setBody(this.body);
+ return rpcMessage;
+ }
+
+
+ @Override
+ public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) {
+ this.body = rpcMessage.getBody();
+ this.headMap = rpcMessage.getHeadMap();
+ this.id = rpcMessage.getId();
+ this.messageType = rpcMessage.getMessageType();
+ this.codec = rpcMessage.getCodec();
+ this.compressor = rpcMessage.getCompressor();
+ }
+}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java
index eb65fd20cf..26ef52ffdc 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java
@@ -27,7 +27,6 @@ import org.apache.seata.core.compressor.CompressorFactory;
import org.apache.seata.core.exception.DecodeException;
import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.protocol.ProtocolConstants;
-import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.core.serializer.SerializerServiceLoader;
import org.apache.seata.core.serializer.SerializerType;
@@ -125,7 +124,7 @@ public class ProtocolV1Decoder extends
LengthFieldBasedFrameDecoder {
byte compressorType = frame.readByte();
int requestId = frame.readInt();
- RpcMessage rpcMessage = new RpcMessage();
+ ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
rpcMessage.setCodec(codecType);
rpcMessage.setId(requestId);
rpcMessage.setCompressor(compressorType);
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java
index 7c32e98f2d..575992fd87 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java
@@ -66,8 +66,10 @@ public class ProtocolV1Encoder extends MessageToByteEncoder {
try {
if (msg instanceof RpcMessage) {
- RpcMessage rpcMessage = (RpcMessage) msg;
+ RpcMessage rpcMsg = (RpcMessage) msg;
+ ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
+ rpcMessage.rpcMsg2ProtocolMsg(rpcMsg);
int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
int headLength = ProtocolConstants.V1_HEAD_LENGTH;
diff --git
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java
index 98b9bd7964..e35c124e30 100644
---
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java
+++
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java
@@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.DefaultPromise;
import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,8 +50,8 @@ public class ClientChannelHandler extends
ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
- if (msg instanceof RpcMessage) {
- RpcMessage rpcMessage = (RpcMessage) msg;
+ if (msg instanceof ProtocolRpcMessage) {
+ RpcMessage rpcMessage = ((ProtocolRpcMessage)
msg).protocolMsg2RpcMsg();
int msgId = rpcMessage.getId();
DefaultPromise future = (DefaultPromise)
client.futureMap.remove(msgId);
if (future != null) {
@@ -58,7 +59,10 @@ public class ClientChannelHandler extends
ChannelInboundHandlerAdapter {
} else {
LOGGER.warn("miss msg id:{}", msgId);
}
+ }else {
+ LOGGER.warn("msg is not ProtocolRpcMessage");
}
+
}
@Override
diff --git
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java
index 31025304a1..5ee0df7dee 100644
---
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java
+++
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java
@@ -27,8 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.core.model.BranchType;
-import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.transaction.BranchCommitRequest;
+import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -80,7 +80,7 @@ public class ProtocolV1SerializerTest {
while (tag.getAndIncrement() < runTimes) {
try {
Future future = client.sendRpc(head, body);
- RpcMessage resp = (RpcMessage) future.get(10,
TimeUnit.SECONDS);
+ ProtocolRpcMessage resp = (ProtocolRpcMessage)
future.get(10, TimeUnit.SECONDS);
if (resp != null) {
success.incrementAndGet();
}
diff --git
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java
index 58745a41ad..8b468d0e8f 100644
---
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java
+++
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java
@@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,11 +40,12 @@ public class ServerChannelHandler extends
ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel channel = ctx.channel();
- if (msg instanceof RpcMessage) {
- ((RpcMessage)
msg).setMessageType(ProtocolConstants.MSGTYPE_RESPONSE);
+ if (msg instanceof ProtocolRpcMessage) {
+ RpcMessage rpcMessage = ((ProtocolRpcMessage)
msg).protocolMsg2RpcMsg();
+ channel.writeAndFlush(rpcMessage);
+ } else {
+ LOGGER.error("rpcMessage type error");
}
-
- channel.writeAndFlush(msg);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]