This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new 60a81b1e93 feature: add grpc serializer (#6992)
60a81b1e93 is described below

commit 60a81b1e93b6e2833df99c89ee21c5687512a2e7
Author: yiqi <77573225+pleasegivemethec...@users.noreply.github.com>
AuthorDate: Sun Nov 10 21:02:06 2024 +0800

    feature: add grpc serializer (#6992)
---
 changes/en-us/2.x.md                               |  1 +
 changes/zh-cn/2.x.md                               |  1 +
 .../seata/core/rpc/netty/grpc/GrpcDecoder.java     |  2 +-
 .../seata/core/rpc/netty/grpc/GrpcEncoder.java     |  4 +-
 .../seata/core/serializer/SerializerType.java      |  9 +++-
 .../seata/serializer/protobuf/GrpcSerializer.java  | 60 ++++++++++++++++++++++
 .../org.apache.seata.core.serializer.Serializer    |  3 +-
 test/pom.xml                                       |  5 ++
 .../seata/core/rpc/netty/mockserver/GrpcTest.java  | 13 +++--
 .../org.apache.seata.core.serializer.Serializer    |  2 +-
 10 files changed, 89 insertions(+), 11 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 8ba5550332..cfd89f80ce 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -8,6 +8,7 @@ Add changes here for all PR submitted to the 2.x branch.
 - [[#6881](https://github.com/apache/incubator-seata/pull/6881)] support grpc
 - [[#6864](https://github.com/apache/incubator-seata/pull/6864)] support 
shentong database
 - [[#6974](https://github.com/apache/incubator-seata/pull/6974)] support 
fastjson2 undolog parser
+- [[#6992](https://github.com/apache/incubator-seata/pull/6992)] support grpc 
serializer
 
 
 ### bugfix:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 915a039cbb..6aeb138b52 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -8,6 +8,7 @@
 - [[#6881](https://github.com/apache/incubator-seata/pull/6881)] 
client和server支持grpc协议
 - [[#6864](https://github.com/apache/incubator-seata/pull/6864)] 支持神通数据库(oscar)
 - [[#6974](https://github.com/apache/incubator-seata/pull/6974)] 
支持UndoLog的fastjson2序列化方式
+- [[#6992](https://github.com/apache/incubator-seata/pull/6992)] 支持grpc序列化器
 
 
 ### bugfix:
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java
index e227d5dc7c..5544e994a5 100644
--- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java
@@ -94,7 +94,7 @@ public class GrpcDecoder extends ChannelDuplexHandler {
                         bodyBytes = compressor.decompress(bodyBytes);
                     }
                     String codecValue = 
headMap.get(GrpcHeaderEnum.CODEC_TYPE.header);
-                    int codec = StringUtils.isBlank(codecValue) ? 
SerializerType.PROTOBUF.getCode()
+                    int codec = StringUtils.isBlank(codecValue) ? 
SerializerType.GRPC.getCode()
                         : Integer.parseInt(codecValue);
                     SerializerType serializerType = 
SerializerType.getByCode(codec);
                     rpcMsg.setCodec(serializerType.getCode());
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java
index dbbbfe1be4..2601a2f0a6 100644
--- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java
@@ -64,14 +64,14 @@ public class GrpcEncoder extends 
ChannelOutboundHandlerAdapter {
         ByteString dataBytes;
         if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
                 && messageType != 
ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
-            Serializer serializer = 
SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.PROTOBUF.getCode()));
+            Serializer serializer = 
SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.GRPC.getCode()));
             byte[] serializedBytes = serializer.serialize(body);
             Compressor compressor = 
CompressorFactory.getCompressor(rpcMessage.getCompressor());
             dataBytes = 
ByteString.copyFrom(compressor.compress(serializedBytes));
         } else {
             dataBytes = ByteString.EMPTY;
         }
-        headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, 
String.valueOf(SerializerType.PROTOBUF.getCode()));
+        headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, 
String.valueOf(SerializerType.GRPC.getCode()));
         headMap.put(GrpcHeaderEnum.COMPRESS_TYPE.header, 
String.valueOf(rpcMessage.getCompressor()));
         GrpcMessageProto.Builder builder = GrpcMessageProto.newBuilder()
                 .putAllHeadMap(headMap)
diff --git 
a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java 
b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java
index 56fd8136d1..39772b34ce 100644
--- a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java
+++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java
@@ -70,7 +70,14 @@ public enum SerializerType {
      * Math.pow(2, 6)
      */
     FASTJSON2((byte)0x64),
-    ;
+
+
+    /**
+     * The grpc
+     * <p>
+     * Math.pow(2, 7)
+     */
+    GRPC((byte) 0x128);
 
     private final byte code;
 
diff --git 
a/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java
 
b/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java
new file mode 100644
index 0000000000..2ef8eac784
--- /dev/null
+++ 
b/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.serializer.protobuf;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.Message;
+import org.apache.seata.common.exception.ShouldNeverHappenException;
+import org.apache.seata.common.loader.LoadLevel;
+import org.apache.seata.core.serializer.Serializer;
+import org.apache.seata.serializer.protobuf.convertor.PbConvertor;
+import org.apache.seata.serializer.protobuf.manager.ProtobufConvertManager;
+
+@LoadLevel(name = "GRPC")
+public class GrpcSerializer implements Serializer {
+    @Override
+    public <T> byte[] serialize(T t) {
+        PbConvertor pbConvertor = ProtobufConvertManager.getInstance()
+                .fetchConvertor(t.getClass().getName());
+        Any grpcBody = Any.pack((Message) pbConvertor.convert2Proto(t));
+
+        return grpcBody.toByteArray();
+    }
+
+    @Override
+    public <T> T deserialize(byte[] bytes) {
+        try {
+            Any body = Any.parseFrom(bytes);
+            final Class clazz = 
ProtobufConvertManager.getInstance().fetchProtoClass(getTypeNameFromTypeUrl(body.getTypeUrl()));
+            if (body.is(clazz)) {
+                Object ob = body.unpack(clazz);
+                PbConvertor pbConvertor = 
ProtobufConvertManager.getInstance().fetchReversedConvertor(clazz.getName());
+
+                return (T) pbConvertor.convert2Model(ob);
+            }
+        } catch (Throwable e) {
+            throw new ShouldNeverHappenException("GrpcSerializer deserialize 
error", e);
+        }
+
+        return null;
+    }
+
+    private String getTypeNameFromTypeUrl(String typeUri) {
+        int pos = typeUri.lastIndexOf('/');
+        return pos == -1 ? "" : typeUri.substring(pos + 1);
+    }
+}
diff --git 
a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
 
b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
index 71098c5367..f6fbf709de 100644
--- 
a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
+++ 
b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
@@ -14,4 +14,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-org.apache.seata.serializer.protobuf.ProtobufSerializer
\ No newline at end of file
+org.apache.seata.serializer.protobuf.ProtobufSerializer
+org.apache.seata.serializer.protobuf.GrpcSerializer
\ No newline at end of file
diff --git a/test/pom.xml b/test/pom.xml
index d35f25bad5..e9991c688d 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -71,6 +71,11 @@
             <artifactId>seata-tm</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seata</groupId>
+            <artifactId>seata-serializer-protobuf</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>io.grpc</groupId>
             <artifactId>grpc-alts</artifactId>
diff --git 
a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java 
b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java
index 0d63d2eb70..042160a9ba 100644
--- 
a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java
+++ 
b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.seata.core.rpc.netty.mockserver;
 
+import com.google.protobuf.Any;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.stub.StreamObserver;
@@ -25,6 +26,8 @@ import org.apache.seata.config.ConfigurationFactory;
 import org.apache.seata.core.protocol.generated.GrpcMessageProto;
 import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
 import org.apache.seata.core.rpc.netty.TmNettyRemotingClient;
+import org.apache.seata.core.rpc.netty.grpc.GrpcHeaderEnum;
+import org.apache.seata.core.serializer.SerializerType;
 import org.apache.seata.mockserver.MockServer;
 import org.apache.seata.serializer.protobuf.generated.*;
 import org.apache.seata.core.protocol.generated.SeataServiceGrpc;
@@ -69,7 +72,7 @@ public class GrpcTest {
                 .setAbstractIdentifyRequest(abstractIdentifyRequestProto)
                 .build();
 
-        return 
GrpcMessageProto.newBuilder().setBody(registerTMRequestProto.toByteString()).build();
+        return 
GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, 
String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(registerTMRequestProto).toByteString()).build();
     }
 
     private GrpcMessageProto getGlobalBeginRequest() {
@@ -77,7 +80,7 @@ public class GrpcTest {
                 .setTransactionName("test-transaction")
                 .setTimeout(2000)
                 .build();
-        return 
GrpcMessageProto.newBuilder().setBody(globalBeginRequestProto.toByteString()).build();
+        return 
GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, 
String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalBeginRequestProto).toByteString()).build();
     }
 
     private GrpcMessageProto getBranchRegisterRequest() {
@@ -89,7 +92,7 @@ public class GrpcTest {
                 .setApplicationData("{\"mock\":\"mock\"}")
                 .build();
 
-        return 
GrpcMessageProto.newBuilder().setBody(branchRegisterRequestProto.toByteString()).build();
+        return 
GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, 
String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(branchRegisterRequestProto).toByteString()).build();
     }
 
     private GrpcMessageProto getGlobalCommitRequest() {
@@ -100,7 +103,7 @@ public class GrpcTest {
                 .setAbstractGlobalEndRequest(globalEndRequestProto)
                 .build();
 
-        return 
GrpcMessageProto.newBuilder().setBody(globalCommitRequestProto.toByteString()).build();
+        return 
GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, 
String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalCommitRequestProto).toByteString()).build();
     }
 
     private GrpcMessageProto getGlobalRollbackRequest() {
@@ -111,7 +114,7 @@ public class GrpcTest {
                 .setAbstractGlobalEndRequest(globalEndRequestProto)
                 .build();
 
-        return 
GrpcMessageProto.newBuilder().setBody(globalRollbackRequestProto.toByteString()).build();
+        return 
GrpcMessageProto.newBuilder().putHeadMap(GrpcHeaderEnum.CODEC_TYPE.header, 
String.valueOf(SerializerType.GRPC.getCode())).setBody(Any.pack(globalRollbackRequestProto).toByteString()).build();
     }
 
     @Test
diff --git 
a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
 
b/test/src/test/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
similarity index 93%
copy from 
serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
copy to 
test/src/test/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
index 71098c5367..81c5235e25 100644
--- 
a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
+++ 
b/test/src/test/resources/META-INF/services/org.apache.seata.core.serializer.Serializer
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-org.apache.seata.serializer.protobuf.ProtobufSerializer
\ No newline at end of file
+org.apache.seata.serializer.protobuf.GrpcSerializer
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org

Reply via email to