This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new 08eb0e65f6 feature: multi-version seata protocol support (#6226)
08eb0e65f6 is described below
commit 08eb0e65f6769048bef8533b66691a682854621f
Author: justabug <[email protected]>
AuthorDate: Thu Jun 27 14:23:43 2024 +0800
feature: multi-version seata protocol support (#6226)
---
.../org/apache/seata/core/protocol/RpcMessage.java | 10 ++
.../org/apache/seata/core/protocol/Version.java | 10 ++
.../rpc/netty/AbstractNettyRemotingServer.java | 40 ++++--
.../core/rpc/netty/CompatibleProtocolDecoder.java | 155 +++++++++++++++++++++
.../core/rpc/netty/CompatibleProtocolEncoder.java | 79 +++++++++++
.../seata/core/rpc/netty/NettyClientBootstrap.java | 6 +-
.../seata/core/rpc/netty/NettyServerBootstrap.java | 6 +-
.../seata/core/rpc/netty/ProtocolDecoder.java | 29 ++++
.../seata/core/rpc/netty/ProtocolEncoder.java | 28 ++++
.../seata/core/rpc/netty/v0/MessageCodecV0.java | 44 ++++++
.../core/rpc/netty/v0/ProtocolConstantsV0.java | 31 +++++
.../seata/core/rpc/netty/v0/ProtocolDecoderV0.java | 132 ++++++++++++++++++
.../seata/core/rpc/netty/v0/ProtocolEncoderV0.java | 106 ++++++++++++++
...otocolV1Decoder.java => ProtocolDecoderV1.java} | 60 ++------
...otocolV1Encoder.java => ProtocolEncoderV1.java} | 103 +++++++-------
.../core/rpc/processor/server/RegTmProcessor.java | 4 +-
.../core/rpc/netty/mockserver/RmClientTest.java | 6 -
.../core/rpc/netty/mockserver/TmClientTest.java | 6 -
.../seata/core/rpc/netty/v1/ProtocolV1Client.java | 7 +-
.../seata/core/rpc/netty/v1/ProtocolV1Server.java | 10 +-
20 files changed, 734 insertions(+), 138 deletions(-)
diff --git a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java
b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java
index 4f0963b20f..6a7e4d9da5 100644
--- a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java
+++ b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java
@@ -35,6 +35,8 @@ public class RpcMessage implements Serializable {
private Map<String, String> headMap = new HashMap<>();
private Object body;
+ private String otherSideVersion;
+
/**
* Gets id.
*
@@ -169,6 +171,14 @@ public class RpcMessage implements Serializable {
this.messageType = messageType;
}
+ public String getOtherSideVersion() {
+ return otherSideVersion;
+ }
+
+ public void setOtherSideVersion(String otherSideVersion) {
+ this.otherSideVersion = otherSideVersion;
+ }
+
@Override
public String toString() {
return StringUtils.toString(this);
diff --git a/core/src/main/java/org/apache/seata/core/protocol/Version.java
b/core/src/main/java/org/apache/seata/core/protocol/Version.java
index a9be3d9c09..d1a95924cf 100644
--- a/core/src/main/java/org/apache/seata/core/protocol/Version.java
+++ b/core/src/main/java/org/apache/seata/core/protocol/Version.java
@@ -150,6 +150,16 @@ public class Version {
return -1;
}
+ public static byte calcProtocolVersion(String sdkVersion) throws
IncompatibleVersionException {
+ long version = convertVersion(sdkVersion);
+ long v0 = convertVersion(VERSION_0_7_1);
+ if (version <= v0) {
+ return ProtocolConstants.VERSION_0;
+ } else {
+ return ProtocolConstants.VERSION_1;
+ }
+ }
+
private static long calculatePartValue(String partNumeric, int size, int
index) {
return Long.parseLong(partNumeric) * Double.valueOf(Math.pow(100, size
- index)).longValue();
}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
index 72324c2689..9be9e79c3b 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
@@ -69,7 +69,7 @@ public abstract class AbstractNettyRemotingServer extends
AbstractNettyRemoting
if (channel == null) {
throw new RuntimeException("rm client is not connected. dbkey:" +
resourceId + ",clientId:" + clientId);
}
- RpcMessage rpcMessage = buildRequestMessage(msg,
ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
+ RpcMessage rpcMessage = buildRequestMessage(channel, msg,
ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
return super.sendSync(channel, rpcMessage,
NettyServerConfig.getRpcRequestTimeout());
}
@@ -78,7 +78,7 @@ public abstract class AbstractNettyRemotingServer extends
AbstractNettyRemoting
if (channel == null) {
throw new RuntimeException("client is not connected");
}
- RpcMessage rpcMessage = buildRequestMessage(msg,
ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
+ RpcMessage rpcMessage = buildRequestMessage(channel, msg,
ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
return super.sendSync(channel, rpcMessage,
NettyServerConfig.getRpcRequestTimeout());
}
@@ -87,26 +87,42 @@ public abstract class AbstractNettyRemotingServer extends
AbstractNettyRemoting
if (channel == null) {
throw new RuntimeException("client is not connected");
}
- RpcMessage rpcMessage = buildRequestMessage(msg,
ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
+ RpcMessage rpcMessage = buildRequestMessage(channel, msg,
ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
super.sendAsync(channel, rpcMessage);
}
@Override
public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel,
Object msg) {
- final Channel clientChannel = msg instanceof HeartbeatMessage
- ? channel
- : ChannelManager.getSameClientChannel(channel);
-
- if (clientChannel == null) {
- throw new RuntimeException("Not found client channel to response |
channel: " + channel);
+ Channel clientChannel = channel;
+ if (!(msg instanceof HeartbeatMessage)) {
+ clientChannel = ChannelManager.getSameClientChannel(channel);
}
-
- RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg
instanceof HeartbeatMessage
+ if (clientChannel != null) {
+ RpcMessage rpcMsg = buildResponseMessage(channel, rpcMessage, msg,
msg instanceof HeartbeatMessage
? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE
: ProtocolConstants.MSGTYPE_RESPONSE);
- super.sendAsync(clientChannel, rpcMsg);
+ super.sendAsync(clientChannel, rpcMsg);
+ } else {
+ throw new RuntimeException("channel is error.");
+ }
}
+
+ private RpcMessage buildResponseMessage(Channel channel, RpcMessage
fromRpcMessage, Object msg, byte messageType) {
+ RpcMessage rpcMessage = super.buildResponseMessage(fromRpcMessage,
msg, messageType);
+ RpcContext rpcContext =
ChannelManager.getContextFromIdentified(channel);
+ rpcMessage.setOtherSideVersion(rpcContext.getVersion());
+ return rpcMessage;
+ }
+
+ protected RpcMessage buildRequestMessage(Channel channel, Object msg, byte
messageType) {
+ RpcMessage rpcMessage = super.buildRequestMessage(msg, messageType);
+ RpcContext rpcContext =
ChannelManager.getContextFromIdentified(channel);
+ rpcMessage.setOtherSideVersion(rpcContext.getVersion());
+ return rpcMessage;
+ }
+
+
@Override
public void registerProcessor(int messageType, RemotingProcessor
processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor,
executor);
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java
new file mode 100644
index 0000000000..d066984c23
--- /dev/null
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import com.google.common.collect.ImmutableMap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.seata.core.exception.DecodeException;
+import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.rpc.netty.v0.ProtocolDecoderV0;
+import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * <pre>
+ * (> 0.7.0)
+ * 0 1 2 3 4 5 6 7 8 9 10 11 12
13 14 15 16
+ *
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+ * | magic |Proto| Full length | Head | Msg |Seria|Compr|
RequestId |
+ * | code |colVer| (head+body) | Length |Type |lizer|ess |
|
+ *
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
+ *
+ * (<= 0.7.0)
+ * 0 1 2 3 4 6 8 10 12
14
+ *
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+ * | 0xdada | flag | typecode/ | requestid
|
+ * | | | bodylength|
|
+ *
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
+ *
+ * </pre>
+ * <p>
+ * <li>Full Length: include all data </li>
+ * <li>Head Length: include head data from magic code to head map. </li>
+ * <li>Body Length: Full Length - Head Length</li>
+ * </p>
+ */
+public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CompatibleProtocolDecoder.class);
+ private static Map<Byte, ProtocolDecoder> protocolDecoderMap;
+
+ public CompatibleProtocolDecoder() {
+ // default is 8M
+ this(ProtocolConstants.MAX_FRAME_LENGTH);
+ }
+
+ public CompatibleProtocolDecoder(int maxFrameLength) {
+ /*
+ int maxFrameLength,
+ int lengthFieldOffset, magic code is 2B, and version is 1B, and then
FullLength. so value is 3
+ int lengthFieldLength, FullLength is int(4B). so values is 4
+ int lengthAdjustment, FullLength include all data and read 7 bytes
before, so the left length is (FullLength-7). so values is -7
+ int initialBytesToStrip we will check magic code and version self, so
do not strip any bytes. so values is 0
+ */
+ super(maxFrameLength, 3, 4, -7, 0);
+ protocolDecoderMap = ImmutableMap.<Byte, ProtocolDecoder>builder()
+ .put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0())
+ .put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1())
+ .build();
+ }
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws
Exception {
+ ByteBuf frame;
+ Object decoded;
+ byte version;
+ try {
+ if (isV0(in)) {
+ decoded = in;
+ version = ProtocolConstants.VERSION_0;
+ } else {
+ decoded = super.decode(ctx, in);
+ version = decideVersion(decoded);
+ }
+
+ if (decoded instanceof ByteBuf) {
+ frame = (ByteBuf) decoded;
+ try {
+ ProtocolDecoder decoder = protocolDecoderMap.get(version);
+ if (decoder == null) {
+ throw new UnsupportedOperationException("Unsupported
version: " + version);
+ }
+ return decoder.decodeFrame(frame);
+ } finally {
+ if (version != ProtocolConstants.VERSION_0) {
+ frame.release();
+ }
+ }
+ }
+ } catch (Exception exx) {
+ LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
+ throw new DecodeException(exx);
+ }
+ return decoded;
+ }
+
+ protected byte decideVersion(Object in) {
+ if (in instanceof ByteBuf) {
+ ByteBuf frame = (ByteBuf) in;
+ frame.markReaderIndex();
+ byte b0 = frame.readByte();
+ byte b1 = frame.readByte();
+ if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
+ || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
+ throw new IllegalArgumentException("Unknown magic code: " + b0
+ ", " + b1);
+ }
+
+ byte version = frame.readByte();
+ frame.resetReaderIndex();
+ return version;
+ }
+ return -1;
+ }
+
+
+ protected boolean isV0(ByteBuf in) {
+ boolean isV0 = false;
+ in.markReaderIndex();
+ byte b0 = in.readByte();
+ byte b1 = in.readByte();
+ // v1/v2/v3 : b2 = version
+ // v0 : 1st byte in FLAG(2byte:0x10/0x20/0x40/0x80)
+ byte b2 = in.readByte();
+ if (ProtocolConstants.MAGIC_CODE_BYTES[0] == b0
+ && ProtocolConstants.MAGIC_CODE_BYTES[1] == b1
+ && 0 == b2) {
+ isV0 = true;
+ }
+
+ in.resetReaderIndex();
+ return isV0;
+ }
+
+ protected boolean isV0(byte version) {
+ return version == ProtocolConstants.VERSION_0;
+ }
+}
\ No newline at end of file
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java
new file mode 100644
index 0000000000..e588b92b8e
--- /dev/null
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import com.google.common.collect.ImmutableMap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.protocol.Version;
+import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0;
+import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Compatible Protocol Encoder
+ * <p>
+ * <li>Full Length: include all data </li>
+ * <li>Head Length: include head data from magic code to head map. </li>
+ * <li>Body Length: Full Length - Head Length</li>
+ * </p>
+ */
+public class CompatibleProtocolEncoder extends MessageToByteEncoder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CompatibleProtocolEncoder.class);
+
+ private static Map<Byte, ProtocolEncoder> protocolEncoderMap;
+
+ public CompatibleProtocolEncoder() {
+ super();
+ protocolEncoderMap = ImmutableMap.<Byte, ProtocolEncoder>builder()
+ .put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0())
+ .put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1())
+ .build();
+ }
+
+ @Override
+ public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
+ try {
+ if (msg instanceof RpcMessage) {
+ RpcMessage rpcMessage = (RpcMessage) msg;
+ String sdkVersion = rpcMessage.getOtherSideVersion();
+ if (StringUtils.isBlank(sdkVersion)) {
+ sdkVersion = Version.getCurrent();
+ }
+ byte protocolVersion = Version.calcProtocolVersion(sdkVersion);
+ ProtocolEncoder encoder =
protocolEncoderMap.get(protocolVersion);
+ if (encoder == null) {
+ throw new UnsupportedOperationException("Unsupported
protocolVersion: " + protocolVersion);
+ }
+
+ encoder.encode(rpcMessage, out);
+ } else {
+ throw new UnsupportedOperationException("Not support this
class:" + msg.getClass());
+ }
+ } catch (Throwable e) {
+ LOGGER.error("Encode request error!", e);
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java
index 63b15a7466..4867f86bcf 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java
@@ -35,8 +35,6 @@ import io.netty.util.internal.PlatformDependent;
import org.apache.seata.common.exception.FrameworkException;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.core.rpc.RemotingBootstrap;
-import org.apache.seata.core.rpc.netty.v1.ProtocolV1Decoder;
-import org.apache.seata.core.rpc.netty.v1.ProtocolV1Encoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,8 +132,8 @@ public class NettyClientBootstrap implements
RemotingBootstrap {
new
IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()))
- .addLast(new ProtocolV1Decoder())
- .addLast(new ProtocolV1Encoder());
+ .addLast(new CompatibleProtocolDecoder())
+ .addLast(new CompatibleProtocolEncoder());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
index 6c9a325882..b847b2a96d 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
@@ -37,8 +37,6 @@ import org.apache.seata.common.XID;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.rpc.RemotingBootstrap;
-import org.apache.seata.core.rpc.netty.v1.ProtocolV1Decoder;
-import org.apache.seata.core.rpc.netty.v1.ProtocolV1Encoder;
import org.apache.seata.discovery.registry.MultiRegistryFactory;
import org.apache.seata.discovery.registry.RegistryService;
import org.slf4j.Logger;
@@ -161,8 +159,8 @@ public class NettyServerBootstrap implements
RemotingBootstrap {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new
IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
- .addLast(new ProtocolV1Decoder())
- .addLast(new ProtocolV1Encoder());
+ .addLast(new CompatibleProtocolDecoder())
+ .addLast(new CompatibleProtocolEncoder());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java
new file mode 100644
index 0000000000..42a7c75c04
--- /dev/null
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * the protocol decoder
+ *
+ **/
+public interface ProtocolDecoder {
+
+ ProtocolRpcMessage decodeFrame(ByteBuf in);
+
+}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolEncoder.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolEncoder.java
new file mode 100644
index 0000000000..6c91164fff
--- /dev/null
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolEncoder.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.seata.core.protocol.RpcMessage;
+
+/**
+ * the protocol encoder
+ *
+ **/
+public interface ProtocolEncoder {
+ void encode(RpcMessage rpcMessage, ByteBuf out);
+}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/MessageCodecV0.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/MessageCodecV0.java
new file mode 100644
index 0000000000..ab1d4f7471
--- /dev/null
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/MessageCodecV0.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty.v0;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.seata.core.protocol.MessageTypeAware;
+
+/**
+ * The interface Message codec.
+ *
+ */
+public interface MessageCodecV0<T> extends MessageTypeAware {
+
+ /**
+ * Encode byte [ ].
+ *
+ * @return the byte [ ]
+ */
+ byte[] encode();
+
+ /**
+ * Decode boolean.
+ *
+ * @param in the in
+ * @return the boolean
+ */
+ boolean decode(ByteBuf in);
+
+ boolean decode(ByteBuf in, T req);
+}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolConstantsV0.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolConstantsV0.java
new file mode 100644
index 0000000000..65b3634059
--- /dev/null
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolConstantsV0.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty.v0;
+
+/**
+ * protocol v0 constants
+ *
+ **/
+public class ProtocolConstantsV0 {
+ public static short MAGIC = (short)0xdada;
+
+ public static int HEAD_LENGTH = 14;
+ public static final short FLAG_REQUEST = 0x80;
+ public static final short FLAG_ASYNC = 0x40;
+ public static final short FLAG_HEARTBEAT = 0x20;
+ public static final short FLAG_SEATA_CODEC = 0x10;
+}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java
new file mode 100644
index 0000000000..42e112a2f6
--- /dev/null
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty.v0;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.seata.core.protocol.HeartbeatMessage;
+
+import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.rpc.netty.ProtocolDecoder;
+import org.apache.seata.core.serializer.Serializer;
+import org.apache.seata.core.serializer.SerializerServiceLoader;
+import org.apache.seata.core.serializer.SerializerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <pre>
+ * seata-version < 0.7
+ * Only used in TC receives a request from RM/TM.
+ * 0 1 2 3 4 6 8 10 12
14 16
+ *
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+ * | 0xdada | flag | typecode/ | requestid
| |
+ * | | | bodylength|
| |
+ *
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
+
+ * | ... ...
|
+ * +
+
+ * | body
|
+ * +
+
+ * | ... ...
|
+ *
+-----------------------------------------------------------------------------------------------+
+ *
+ * </pre>
+ * <p>
+ * <li>flag: msg type </li>
+ * <li>typecode: action type code </li>
+ * <li>bodylength: body Length </li>
+ * <li>requestid: request id</li>
+ * </p>
+ *
+ * @see ProtocolEncoderV0
+ */
+public class ProtocolDecoderV0 implements ProtocolDecoder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProtocolDecoderV0.class);
+
+
+ @Override
+ public ProtocolRpcMessageV0 decodeFrame(ByteBuf in) {
+ ProtocolRpcMessageV0 rpcMessage = new ProtocolRpcMessageV0();
+ if (in.readableBytes() < ProtocolConstantsV0.HEAD_LENGTH) {
+ throw new IllegalArgumentException("Nothing to decode.");
+ }
+
+ in.markReaderIndex();
+ short protocol = in.readShort();
+ int flag = (int) in.readShort();
+
+ boolean isHeartbeat = (ProtocolConstantsV0.FLAG_HEARTBEAT & flag) > 0;
+ boolean isRequest = (ProtocolConstantsV0.FLAG_REQUEST & flag) > 0;
+ boolean isSeataCodec = (ProtocolConstantsV0.FLAG_SEATA_CODEC & flag) >
0;
+ rpcMessage.setSeataCodec(isSeataCodec);
+
+ short bodyLength = 0;
+ short typeCode = 0;
+ if (!isSeataCodec) {
+ bodyLength = in.readShort();
+ } else {
+ typeCode = in.readShort();
+ }
+ long msgId = in.readLong();
+ rpcMessage.setId(msgId);
+ if (isHeartbeat) {
+ rpcMessage.setAsync(true);
+ rpcMessage.setHeartbeat(isHeartbeat);
+ rpcMessage.setRequest(isRequest);
+ if (isRequest) {
+ rpcMessage.setBody(HeartbeatMessage.PING);
+ } else {
+ rpcMessage.setBody(HeartbeatMessage.PONG);
+ }
+
+ return rpcMessage;
+ }
+
+ if (bodyLength > 0 && in.readableBytes() < bodyLength) {
+ in.resetReaderIndex();
+ throw new IllegalArgumentException("readableBytes < bodyLength");
+ }
+
+ rpcMessage.setAsync((ProtocolConstantsV0.FLAG_ASYNC & flag) > 0);
+ rpcMessage.setHeartbeat(false);
+ rpcMessage.setRequest(isRequest);
+
+ try {
+ int length = in.readableBytes();
+ byte[] bs = new byte[length];
+ in.readBytes(bs);
+
+ // fill messageType in v0
+ byte[] bs2 = new byte[2 + length];
+ bs2[0] = (byte) (0x00FF & (typeCode >> 8));
+ bs2[1] = (byte) (0x00FF & typeCode);
+ System.arraycopy(bs, 0, bs2, 2, length);
+ byte codecType = isSeataCodec ? SerializerType.SEATA.getCode() :
SerializerType.HESSIAN.getCode();
+ Serializer serializer =
SerializerServiceLoader.load(SerializerType.getByCode(codecType),
ProtocolConstants.VERSION_0);
+ rpcMessage.setBody(serializer.deserialize(bs2));
+ } catch (Exception e) {
+ LOGGER.error("decode error", e);
+ throw e;
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Receive:" + rpcMessage.getBody() + ", messageId:" +
msgId);
+ }
+ return rpcMessage;
+ }
+
+
+}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java
new file mode 100644
index 0000000000..3fc447b281
--- /dev/null
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty.v0;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.seata.core.protocol.HeartbeatMessage;
+import org.apache.seata.core.protocol.MessageTypeAware;
+import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.rpc.netty.ProtocolEncoder;
+import org.apache.seata.core.serializer.Serializer;
+import org.apache.seata.core.serializer.SerializerServiceLoader;
+import org.apache.seata.core.serializer.SerializerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <pre>
+ * seata-version < 0.7
+ * Only used in TC send a request to RM/TM.
+ * 0 1 2 3 4 6 8 10 12
14 16
+ *
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+ * | 0xdada | flag | typecode/ | requestid
| |
+ * | | | bodylength|
| |
+ *
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
+
+ * | ... ...
|
+ * +
+
+ * | body
|
+ * +
+
+ * | ... ...
|
+ *
+-----------------------------------------------------------------------------------------------+
+ *
+ * </pre>
+ * <p>
+ * <li>flag: msg type </li>
+ * <li>typecode: action type code </li>
+ * <li>bodylength: body Length </li>
+ * <li>requestid: request id</li>
+ * </p>
+ *
+ * @see ProtocolDecoderV0
+ */
+public class ProtocolEncoderV0 implements ProtocolEncoder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProtocolEncoderV0.class);
+
+ @Override
+ public void encode(RpcMessage message, ByteBuf out) {
+ try {
+ byte codec = message.getCodec();
+ ProtocolRpcMessageV0 msg = new ProtocolRpcMessageV0();
+ msg.rpcMsg2ProtocolMsg(message);
+
+ out.writeShort(ProtocolConstantsV0.MAGIC);
+ int flag = (msg.isAsync() ? ProtocolConstantsV0.FLAG_ASYNC : 0)
+ | (msg.isHeartbeat() ? ProtocolConstantsV0.FLAG_HEARTBEAT
: 0)
+ | (msg.isRequest() ? ProtocolConstantsV0.FLAG_REQUEST : 0)
+ | (msg.isSeataCodec() ?
ProtocolConstantsV0.FLAG_SEATA_CODEC : 0);
+
+ out.writeShort((short) flag);
+
+ if (msg.getBody() instanceof HeartbeatMessage) {
+ out.writeShort((short) 0);
+ out.writeLong(msg.getId());
+ return;
+ }
+
+ byte[] bodyBytes = null;
+ Serializer serializer =
SerializerServiceLoader.load(SerializerType.getByCode(codec),
ProtocolConstants.VERSION_0);
+ bodyBytes = serializer.serialize(msg.getBody());
+
+ if (msg.isSeataCodec()) {
+ if (msg.getBody() instanceof MessageTypeAware) {
+ short typeCode = ((MessageTypeAware)
msg.getBody()).getTypeCode();
+ out.writeShort(typeCode);
+ }
+ } else {
+ out.writeShort(bodyBytes.length);
+ }
+ out.writeLong(msg.getId());
+ if (bodyBytes != null) {
+ out.writeBytes(bodyBytes);
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Send:" + msg.getBody());
+ }
+ } catch (Throwable e) {
+ LOGGER.error("Encode request error!", e);
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java
similarity index 76%
rename from
core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java
rename to
core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java
index 26ef52ffdc..9ca4977944 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java
@@ -20,19 +20,19 @@ import java.util.List;
import java.util.Map;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.seata.core.compressor.Compressor;
import org.apache.seata.core.compressor.CompressorFactory;
-import org.apache.seata.core.exception.DecodeException;
import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.rpc.netty.ProtocolDecoder;
+import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.core.serializer.SerializerServiceLoader;
import org.apache.seata.core.serializer.SerializerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* <pre>
* 0 1 2 3 4 5 6 7 8 9 10 11 12
13 14 15 16
@@ -56,57 +56,24 @@ import org.slf4j.LoggerFactory;
* </p>
* https://github.com/seata/seata/issues/893
*
- * @see ProtocolV1Encoder
+ * @see ProtocolEncoderV1
+ * @author Geng Zhang
+ * @see ProtocolEncoderV1
* @since 0.7.0
*/
-public class ProtocolV1Decoder extends LengthFieldBasedFrameDecoder {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProtocolV1Decoder.class);
+public class ProtocolDecoderV1 implements ProtocolDecoder {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProtocolDecoderV1.class);
private final List<SerializerType> supportDeSerializerTypes;
-
- public ProtocolV1Decoder() {
- // default is 8M
- this(ProtocolConstants.MAX_FRAME_LENGTH);
- }
-
- public ProtocolV1Decoder(int maxFrameLength) {
- /*
- int maxFrameLength,
- int lengthFieldOffset, magic code is 2B, and version is 1B, and then
FullLength. so value is 3
- int lengthFieldLength, FullLength is int(4B). so values is 4
- int lengthAdjustment, FullLength include all data and read 7 bytes
before, so the left length is (FullLength-7). so values is -7
- int initialBytesToStrip we will check magic code and version self, so
do not strip any bytes. so values is 0
- */
- super(maxFrameLength, 3, 4, -7, 0);
+ public ProtocolDecoderV1() {
supportDeSerializerTypes =
SerializerServiceLoader.getSupportedSerializers();
if (supportDeSerializerTypes.isEmpty()) {
throw new IllegalArgumentException("No serializer found");
- }
- }
+ } }
@Override
- protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws
Exception {
- Object decoded;
- try {
- decoded = super.decode(ctx, in);
- if (decoded instanceof ByteBuf) {
- ByteBuf frame = (ByteBuf)decoded;
- try {
- return decodeFrame(frame);
- } finally {
- frame.release();
- }
- }
- } catch (Exception exx) {
- LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
- throw new DecodeException(exx);
- }
- return decoded;
- }
-
- public Object decodeFrame(ByteBuf frame) {
+ public ProtocolRpcMessage decodeFrame(ByteBuf frame) {
byte b0 = frame.readByte();
byte b1 = frame.readByte();
if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
@@ -115,7 +82,6 @@ public class ProtocolV1Decoder extends
LengthFieldBasedFrameDecoder {
}
byte version = frame.readByte();
- // TODO check version compatible here
int fullLength = frame.readInt();
short headLength = frame.readShort();
@@ -154,13 +120,11 @@ public class ProtocolV1Decoder extends
LengthFieldBasedFrameDecoder {
Serializer serializer =
SerializerServiceLoader.load(protocolType, ProtocolConstants.VERSION_1);
rpcMessage.setBody(serializer.deserialize(bs));
} else {
- throw new IllegalArgumentException("SerializerType not
match: " + protocolType.name());
+ throw new IllegalArgumentException("SerializerType not
match");
}
}
}
return rpcMessage;
}
-
-
}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java
similarity index 51%
rename from
core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java
rename to
core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java
index 575992fd87..14cbcdb55d 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java
@@ -17,8 +17,7 @@
package org.apache.seata.core.rpc.netty.v1;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.seata.core.rpc.netty.ProtocolEncoder;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.core.compressor.Compressor;
import org.apache.seata.core.compressor.CompressorFactory;
@@ -54,70 +53,70 @@ import java.util.Map;
* </p>
* https://github.com/seata/seata/issues/893
*
- * @see ProtocolV1Decoder
+ * @author Geng Zhang
+ * @see ProtocolDecoderV1
* @since 0.7.0
*/
-public class ProtocolV1Encoder extends MessageToByteEncoder {
+public class ProtocolEncoderV1 implements ProtocolEncoder {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProtocolV1Encoder.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProtocolEncoderV1.class);
- @Override
- public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
- try {
- if (msg instanceof RpcMessage) {
- RpcMessage rpcMsg = (RpcMessage) msg;
+ public void encode(RpcMessage message, ByteBuf out) {
+ try {
- ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
- rpcMessage.rpcMsg2ProtocolMsg(rpcMsg);
- int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
- int headLength = ProtocolConstants.V1_HEAD_LENGTH;
+ ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
+ rpcMessage.rpcMsg2ProtocolMsg(message);
- byte messageType = rpcMessage.getMessageType();
- out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
- out.writeByte(ProtocolConstants.VERSION);
- // full Length(4B) and head length(2B) will fix in the end.
- out.writerIndex(out.writerIndex() + 6);
- out.writeByte(messageType);
- out.writeByte(rpcMessage.getCodec());
- out.writeByte(rpcMessage.getCompressor());
- out.writeInt(rpcMessage.getId());
+ int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
+ int headLength = ProtocolConstants.V1_HEAD_LENGTH;
- // direct write head with zero-copy
- Map<String, String> headMap = rpcMessage.getHeadMap();
- if (headMap != null && !headMap.isEmpty()) {
- int headMapBytesLength =
HeadMapSerializer.getInstance().encode(headMap, out);
- headLength += headMapBytesLength;
- fullLength += headMapBytesLength;
- }
+ byte messageType = rpcMessage.getMessageType();
+ out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
+ out.writeByte(ProtocolConstants.VERSION_1);
+ // full Length(4B) and head length(2B) will fix in the end.
+ out.writerIndex(out.writerIndex() + 6);
+ out.writeByte(messageType);
+ out.writeByte(rpcMessage.getCodec());
+ out.writeByte(rpcMessage.getCompressor());
+ out.writeInt(rpcMessage.getId());
- byte[] bodyBytes = null;
- if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
- && messageType !=
ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
- // heartbeat has no body
- Serializer serializer =
SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()),
ProtocolConstants.VERSION_1);
- bodyBytes = serializer.serialize(rpcMessage.getBody());
- Compressor compressor =
CompressorFactory.getCompressor(rpcMessage.getCompressor());
- bodyBytes = compressor.compress(bodyBytes);
- fullLength += bodyBytes.length;
- }
+ // direct write head with zero-copy
+ Map<String, String> headMap = rpcMessage.getHeadMap();
+ if (headMap != null && !headMap.isEmpty()) {
+ int headMapBytesLength =
HeadMapSerializer.getInstance().encode(headMap, out);
+ headLength += headMapBytesLength;
+ fullLength += headMapBytesLength;
+ }
- if (bodyBytes != null) {
- out.writeBytes(bodyBytes);
- }
+ byte[] bodyBytes = null;
+ if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
+ && messageType !=
ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
+ // heartbeat has no body
+ Serializer serializer =
SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()),
ProtocolConstants.VERSION_1);
+ bodyBytes = serializer.serialize(rpcMessage.getBody());
+ Compressor compressor =
CompressorFactory.getCompressor(rpcMessage.getCompressor());
+ bodyBytes = compressor.compress(bodyBytes);
+ fullLength += bodyBytes.length;
+ }
- // fix fullLength and headLength
- int writeIndex = out.writerIndex();
- // skip magic code(2B) + version(1B)
- out.writerIndex(writeIndex - fullLength + 3);
- out.writeInt(fullLength);
- out.writeShort(headLength);
- out.writerIndex(writeIndex);
- } else {
- throw new UnsupportedOperationException("Not support this
class:" + msg.getClass());
+ if (bodyBytes != null) {
+ out.writeBytes(bodyBytes);
}
+
+ // fix fullLength and headLength
+ int writeIndex = out.writerIndex();
+ // skip magic code(2B) + version(1B)
+ out.writerIndex(writeIndex - fullLength + 3);
+ out.writeInt(fullLength);
+ out.writeShort(headLength);
+ out.writerIndex(writeIndex);
+
+
} catch (Throwable e) {
LOGGER.error("Encode request error!", e);
+ // todo
+ throw e;
}
}
}
diff --git
a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java
b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java
index 0afee867f8..6090232c6c 100644
---
a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java
+++
b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java
@@ -89,8 +89,8 @@ public class RegTmProcessor implements RemotingProcessor {
}
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
if (isSuccess && LOGGER.isInfoEnabled()) {
- LOGGER.info("TM register success,message:{},channel:{},client
version:{}", message, ctx.channel(),
- message.getVersion());
+ LOGGER.info("TM register success,message:{},channel:{},client
version:{},client protocol-version:{}"
+ , message, ctx.channel(), message.getVersion(),
rpcMessage.getOtherSideVersion());
}
}
diff --git
a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/RmClientTest.java
b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/RmClientTest.java
index 671f1d4431..c7f100bce7 100644
---
a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/RmClientTest.java
+++
b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/RmClientTest.java
@@ -17,8 +17,6 @@
package org.apache.seata.core.rpc.netty.mockserver;
import io.netty.channel.Channel;
-import org.apache.seata.common.ConfigurationKeys;
-import org.apache.seata.common.ConfigurationTestHelper;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.model.BranchStatus;
@@ -27,13 +25,9 @@ import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.rpc.netty.ChannelManagerTestHelper;
import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
import
org.apache.seata.integration.tx.api.interceptor.parser.DefaultResourceRegisterParser;
-import org.apache.seata.mockserver.MockServer;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.rm.RMClient;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/TmClientTest.java
b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/TmClientTest.java
index b204b0da81..1bd2325a9d 100644
---
a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/TmClientTest.java
+++
b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/TmClientTest.java
@@ -17,21 +17,15 @@
package org.apache.seata.core.rpc.netty.mockserver;
import io.netty.channel.Channel;
-import org.apache.seata.common.ConfigurationKeys;
-import org.apache.seata.common.ConfigurationTestHelper;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.core.model.TransactionManager;
import org.apache.seata.core.protocol.ResultCode;
import org.apache.seata.core.rpc.netty.ChannelManagerTestHelper;
import org.apache.seata.core.rpc.netty.TmNettyRemotingClient;
import org.apache.seata.mockserver.MockCoordinator;
-import org.apache.seata.mockserver.MockServer;
import org.apache.seata.tm.DefaultTransactionManager;
import org.jetbrains.annotations.NotNull;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java
index 5d20e18b1f..3f52ed63c5 100644
---
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java
+++
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java
@@ -45,6 +45,8 @@ import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.transaction.BranchCommitRequest;
import org.apache.seata.core.serializer.SerializerType;
+import org.apache.seata.core.rpc.netty.CompatibleProtocolDecoder;
+import org.apache.seata.core.rpc.netty.CompatibleProtocolEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,8 +82,8 @@ public class ProtocolV1Client {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
- pipeline.addLast(new ProtocolV1Encoder());
- pipeline.addLast(new ProtocolV1Decoder(8 * 1024 * 1024));
+ pipeline.addLast(new CompatibleProtocolEncoder());
+ pipeline.addLast(new CompatibleProtocolDecoder(8 * 1024 *
1024));
pipeline.addLast(new
ClientChannelHandler(ProtocolV1Client.this));
}
});
@@ -124,6 +126,7 @@ public class ProtocolV1Client {
rpcMessage.setBody(body);
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
+
if (channel != null) {
DefaultPromise promise = new DefaultPromise(defaultEventExecutor);
futureMap.put(msgId, promise);
diff --git
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java
index 9a0d067687..20a2fdbcc5 100644
---
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java
+++
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java
@@ -35,6 +35,12 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.apache.seata.common.thread.NamedThreadFactory;
+import org.apache.seata.common.thread.NamedThreadFactory;
+import org.apache.seata.core.rpc.netty.CompatibleProtocolDecoder;
+import org.apache.seata.core.rpc.netty.CompatibleProtocolEncoder;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
/**
*/
@@ -67,8 +73,8 @@ public class ProtocolV1Server {
@Override
protected void initChannel(Channel channel) throws
Exception {
ChannelPipeline pipeline = channel.pipeline();
- pipeline.addLast(new ProtocolV1Decoder(8 * 1024 *
1024));
- pipeline.addLast(new ProtocolV1Encoder());
+ pipeline.addLast(new CompatibleProtocolDecoder(8 *
1024 * 1024));
+ pipeline.addLast(new CompatibleProtocolEncoder());
pipeline.addLast(new ServerChannelHandler());
}
});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]