This is an automated email from the ASF dual-hosted git repository. jianbin pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push: new 60a81b1e93 feature: add grpc serializer (#6992) 60a81b1e93 is described below commit 60a81b1e93b6e2833df99c89ee21c5687512a2e7 Author: yiqi <77573225+pleasegivemethec...@users.noreply.github.com> AuthorDate: Sun Nov 10 21:02:06 2024 +0800 feature: add grpc serializer (#6992) --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + .../seata/core/rpc/netty/grpc/GrpcDecoder.java | 2 +- .../seata/core/rpc/netty/grpc/GrpcEncoder.java | 4 +- .../seata/core/serializer/SerializerType.java | 9 +++- .../seata/serializer/protobuf/GrpcSerializer.java | 60 ++++++++++++++++++++++ .../org.apache.seata.core.serializer.Serializer | 3 +- test/pom.xml | 5 ++ .../seata/core/rpc/netty/mockserver/GrpcTest.java | 13 +++-- .../org.apache.seata.core.serializer.Serializer | 2 +- 10 files changed, 89 insertions(+), 11 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 8ba5550332..cfd89f80ce 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -8,6 +8,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6881](https://github.com/apache/incubator-seata/pull/6881)] support grpc - [[#6864](https://github.com/apache/incubator-seata/pull/6864)] support shentong database - [[#6974](https://github.com/apache/incubator-seata/pull/6974)] support fastjson2 undolog parser +- [[#6992](https://github.com/apache/incubator-seata/pull/6992)] support grpc serializer ### bugfix: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 915a039cbb..6aeb138b52 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -8,6 +8,7 @@ - [[#6881](https://github.com/apache/incubator-seata/pull/6881)] client和server支持grpc协议 - [[#6864](https://github.com/apache/incubator-seata/pull/6864)] 支持神通数据库(oscar) - [[#6974](https://github.com/apache/incubator-seata/pull/6974)] 支持UndoLog的fastjson2序列化方式 +- [[#6992](https://github.com/apache/incubator-seata/pull/6992)] 支持grpc序列化器 ### bugfix: diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java index e227d5dc7c..5544e994a5 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -94,7 +94,7 @@ public class GrpcDecoder extends ChannelDuplexHandler { bodyBytes = compressor.decompress(bodyBytes); } String codecValue = headMap.get(GrpcHeaderEnum.CODEC_TYPE.header); - int codec = StringUtils.isBlank(codecValue) ? SerializerType.PROTOBUF.getCode() + int codec = StringUtils.isBlank(codecValue) ? SerializerType.GRPC.getCode() : Integer.parseInt(codecValue); SerializerType serializerType = SerializerType.getByCode(codec); rpcMsg.setCodec(serializerType.getCode()); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java index dbbbfe1be4..2601a2f0a6 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java @@ -64,14 +64,14 @@ public class GrpcEncoder extends ChannelOutboundHandlerAdapter { ByteString dataBytes; if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { - Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.PROTOBUF.getCode())); + Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.GRPC.getCode())); byte[] serializedBytes = serializer.serialize(body); Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor()); dataBytes = ByteString.copyFrom(compressor.compress(serializedBytes)); } else { dataBytes = ByteString.EMPTY; } - headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.PROTOBUF.getCode())); + headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())); headMap.put(GrpcHeaderEnum.COMPRESS_TYPE.header, String.valueOf(rpcMessage.getCompressor())); GrpcMessageProto.Builder builder = GrpcMessageProto.newBuilder() .putAllHeadMap(headMap) diff --git a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java index 56fd8136d1..39772b34ce 100644 --- a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java +++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java @@ -70,7 +70,14 @@ public enum SerializerType { * Math.pow(2, 6) */ FASTJSON2((byte)0x64), - ; + + + /** + * The grpc + * <p> + * Math.pow(2, 7) + */ + GRPC((byte) 0x128); private final byte code; diff --git a/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java b/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java new file mode 100644 index 0000000000..2ef8eac784 --- /dev/null +++ b/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.serializer.protobuf; + +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import org.apache.seata.common.exception.ShouldNeverHappenException; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.core.serializer.Serializer; +import org.apache.seata.serializer.protobuf.convertor.PbConvertor; +import org.apache.seata.serializer.protobuf.manager.ProtobufConvertManager; + +@LoadLevel(name = "GRPC") +public class GrpcSerializer implements Serializer { + @Override + public <T> byte[] serialize(T t) { + PbConvertor pbConvertor = ProtobufConvertManager.getInstance() + .fetchConvertor(t.getClass().getName()); + Any grpcBody = Any.pack((Message) pbConvertor.convert2Proto(t)); + + return grpcBody.toByteArray(); + } + + @Override + public <T> T deserialize(byte[] bytes) { + try { + Any body = Any.parseFrom(bytes); + final Class clazz = ProtobufConvertManager.getInstance().fetchProtoClass(getTypeNameFromTypeUrl(body.getTypeUrl())); + if (body.is(clazz)) { + Object ob = body.unpack(clazz); + PbConvertor pbConvertor = ProtobufConvertManager.getInstance().fetchReversedConvertor(clazz.getName()); + + return (T) pbConvertor.convert2Model(ob); + } + } catch (Throwable e) { + throw new ShouldNeverHappenException("GrpcSerializer deserialize error", e); + } + + return null; + } + + private String getTypeNameFromTypeUrl(String typeUri) { + int pos = typeUri.lastIndexOf('/'); + return pos == -1 ? "" : typeUri.substring(pos + 1); + } +} diff --git a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer index 71098c5367..f6fbf709de 100644 --- a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer +++ b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer @@ -14,4 +14,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -org.apache.seata.serializer.protobuf.ProtobufSerializer \ No newline at end of file +org.apache.seata.serializer.protobuf.ProtobufSerializer +org.apache.seata.serializer.protobuf.GrpcSerializer \ No newline at end of file diff --git a/test/pom.xml b/test/pom.xml index d35f25bad5..e9991c688d 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -71,6 +71,11 @@ <artifactId>seata-tm</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.seata</groupId> + <artifactId>seata-serializer-protobuf</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-alts</artifactId> diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java index 0d63d2eb70..042160a9ba 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java @@ -16,6 +16,7 @@ */ package org.apache.seata.core.rpc.netty.mockserver; +import com.google.protobuf.Any; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; @@ -25,6 +26,8 @@ import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.protocol.generated.GrpcMessageProto; import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; +import org.apache.seata.core.rpc.netty.grpc.GrpcHeaderEnum; +import org.apache.seata.core.serializer.SerializerType; import org.apache.seata.mockserver.MockServer; import org.apache.seata.serializer.protobuf.generated.*; import org.apache.seata.core.protocol.generated.SeataServiceGrpc; @@ -69,7 +72,7 @@ public class GrpcTest { .setAbstractIdentifyRequest(abstractIdentifyRequestProto) .build(); - return GrpcMessageProto.newBuilder().setBody(registerTMRequestProto.toByteString()).build(); + return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(registerTMRequestProto).toByteString()).build(); } private GrpcMessageProto getGlobalBeginRequest() { @@ -77,7 +80,7 @@ public class GrpcTest { .setTransactionName("test-transaction") .setTimeout(2000) .build(); - return GrpcMessageProto.newBuilder().setBody(globalBeginRequestProto.toByteString()).build(); + return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalBeginRequestProto).toByteString()).build(); } private GrpcMessageProto getBranchRegisterRequest() { @@ -89,7 +92,7 @@ public class GrpcTest { .setApplicationData("{\"mock\":\"mock\"}") .build(); - return GrpcMessageProto.newBuilder().setBody(branchRegisterRequestProto.toByteString()).build(); + return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(branchRegisterRequestProto).toByteString()).build(); } private GrpcMessageProto getGlobalCommitRequest() { @@ -100,7 +103,7 @@ public class GrpcTest { .setAbstractGlobalEndRequest(globalEndRequestProto) .build(); - return GrpcMessageProto.newBuilder().setBody(globalCommitRequestProto.toByteString()).build(); + return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalCommitRequestProto).toByteString()).build(); } private GrpcMessageProto getGlobalRollbackRequest() { @@ -111,7 +114,7 @@ public class GrpcTest { .setAbstractGlobalEndRequest(globalEndRequestProto) .build(); - return GrpcMessageProto.newBuilder().setBody(globalRollbackRequestProto.toByteString()).build(); + return GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalRollbackRequestProto).toByteString()).build(); } @Test diff --git a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer b/test/src/test/resources/META-INF/services/org.apache.seata.core.serializer.Serializer similarity index 93% copy from serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer copy to test/src/test/resources/META-INF/services/org.apache.seata.core.serializer.Serializer index 71098c5367..81c5235e25 100644 --- a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer +++ b/test/src/test/resources/META-INF/services/org.apache.seata.core.serializer.Serializer @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -org.apache.seata.serializer.protobuf.ProtobufSerializer \ No newline at end of file +org.apache.seata.serializer.protobuf.GrpcSerializer \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org