This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch cloud-native in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 4b71c84e215219355e75d96023c345c8467a9f34 Author: ken.lj <ken.lj...@gmail.com> AuthorDate: Wed Aug 28 17:34:54 2019 +0800 gRPC framework support --- dubbo-all/pom.xml | 8 + dubbo-bom/pom.xml | 5 + .../dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml | 4 + dubbo-demo/dubbo-demo-xml/pom.xml | 8 + dubbo-dependencies-bom/pom.xml | 17 ++ dubbo-rpc/{ => dubbo-rpc-grpc}/pom.xml | 56 +++--- .../rpc/protocol/grpc/DubboHandlerRegistry.java | 70 ++++++++ .../dubbo/rpc/protocol/grpc/GrpcProtocol.java | 200 +++++++++++++++++++++ .../dubbo/internal/org.apache.dubbo.rpc.Protocol | 1 + dubbo-rpc/pom.xml | 1 + pom.xml | 24 +-- 11 files changed, 360 insertions(+), 34 deletions(-) diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml index 35790a4..bbacb73 100644 --- a/dubbo-all/pom.xml +++ b/dubbo-all/pom.xml @@ -221,6 +221,13 @@ </dependency> <dependency> <groupId>org.apache.dubbo</groupId> + <artifactId>dubbo-rpc-grpc</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-registry-api</artifactId> <version>${project.version}</version> <scope>compile</scope> @@ -593,6 +600,7 @@ <include>org.apache.dubbo:dubbo-rpc-redis</include> <include>org.apache.dubbo:dubbo-rpc-rest</include> <include>org.apache.dubbo:dubbo-rpc-xml</include> + <include>org.apache.dubbo:dubbo-rpc-grpc</include> <include>org.apache.dubbo:dubbo-filter-validation</include> <include>org.apache.dubbo:dubbo-filter-cache</include> <include>org.apache.dubbo:dubbo-cluster</include> diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml index 0b0bd1a..6ccbc1b 100644 --- a/dubbo-bom/pom.xml +++ b/dubbo-bom/pom.xml @@ -230,6 +230,11 @@ </dependency> <dependency> <groupId>org.apache.dubbo</groupId> + <artifactId>dubbo-rpc-grpc</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-registry-api</artifactId> <version>${project.version}</version> </dependency> diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml index 63ed1b8..b3a4acc 100644 --- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml +++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/pom.xml @@ -32,6 +32,10 @@ <dependencies> <dependency> <groupId>org.apache.dubbo</groupId> + <artifactId>dubbo-metadata-report-zookeeper</artifactId> + </dependency> + <dependency> + <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-demo-interface</artifactId> <version>${project.parent.version}</version> </dependency> diff --git a/dubbo-demo/dubbo-demo-xml/pom.xml b/dubbo-demo/dubbo-demo-xml/pom.xml index 858d359..8438d25 100644 --- a/dubbo-demo/dubbo-demo-xml/pom.xml +++ b/dubbo-demo/dubbo-demo-xml/pom.xml @@ -40,6 +40,14 @@ <module>dubbo-demo-xml-consumer</module> </modules> + <dependencies> + <dependency> + <groupId>org.apache.dubbo</groupId> + <artifactId>dubbo-metadata-report-zookeeper</artifactId> + <version>${project.parent.version}</version> + </dependency> + </dependencies> + <build> <plugins> <plugin> diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml index c335104..8cd3ca8 100644 --- a/dubbo-dependencies-bom/pom.xml +++ b/dubbo-dependencies-bom/pom.xml @@ -130,6 +130,7 @@ <tomcat_embed_version>8.5.31</tomcat_embed_version> <jetcd_version>0.3.0</jetcd_version> <nacos_version>1.1.1</nacos_version> + <grpc.version>1.22.1</grpc.version> <!-- Log libs --> <slf4j_version>1.7.25</slf4j_version> <jcl_version>1.2</jcl_version> @@ -646,6 +647,22 @@ <artifactId>nacos-client</artifactId> <version>${nacos_version}</version> </dependency> + <!-- grpc related dependencies --> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty-shaded</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${grpc.version}</version> + </dependency> </dependencies> </dependencyManagement> diff --git a/dubbo-rpc/pom.xml b/dubbo-rpc/dubbo-rpc-grpc/pom.xml similarity index 51% copy from dubbo-rpc/pom.xml copy to dubbo-rpc/dubbo-rpc-grpc/pom.xml index 1388b50..53f6978 100644 --- a/dubbo-rpc/pom.xml +++ b/dubbo-rpc/dubbo-rpc-grpc/pom.xml @@ -18,31 +18,43 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.dubbo</groupId> - <artifactId>dubbo-parent</artifactId> + <artifactId>dubbo-rpc</artifactId> <version>${revision}</version> - <relativePath>../pom.xml</relativePath> </parent> - <artifactId>dubbo-rpc</artifactId> - <packaging>pom</packaging> + <artifactId>dubbo-rpc-grpc</artifactId> + <packaging>jar</packaging> <name>${project.artifactId}</name> - <description>The rpc module of dubbo project</description> + <description>The gRPC integration module</description> <properties> <skip_maven_deploy>false</skip_maven_deploy> </properties> - <modules> - <module>dubbo-rpc-api</module> - <module>dubbo-rpc-dubbo</module> - <module>dubbo-rpc-injvm</module> - <module>dubbo-rpc-jsonrpc</module> - <module>dubbo-rpc-rmi</module> - <module>dubbo-rpc-hessian</module> - <module>dubbo-rpc-http</module> - <module>dubbo-rpc-webservice</module> - <module>dubbo-rpc-native-thrift</module> - <module>dubbo-rpc-thrift</module> - <module>dubbo-rpc-memcached</module> - <module>dubbo-rpc-redis</module> - <module>dubbo-rpc-rest</module> - <module>dubbo-rpc-xml</module> - </modules> -</project> + <dependencies> + <dependency> + <groupId>org.apache.dubbo</groupId> + <artifactId>dubbo-rpc-api</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.dubbo</groupId> + <artifactId>dubbo-remoting-http</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty-shaded</artifactId> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-test</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/DubboHandlerRegistry.java b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/DubboHandlerRegistry.java new file mode 100644 index 0000000..aa9a17b --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/DubboHandlerRegistry.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.grpc; + +import io.grpc.BindableService; +import io.grpc.HandlerRegistry; +import io.grpc.ServerMethodDefinition; +import io.grpc.ServerServiceDefinition; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + */ +public class DubboHandlerRegistry extends HandlerRegistry { + + private final Map<String, ServerServiceDefinition> services = new ConcurrentHashMap<>(); + private final Map<String, ServerMethodDefinition<?, ?>> methods = new ConcurrentHashMap<>(); + + public DubboHandlerRegistry() {} + + /** + * Returns the service definitions in this registry. + */ + @Override + public List<ServerServiceDefinition> getServices() { + return Collections.unmodifiableList(new ArrayList<>(services.values())); + } + + @Nullable + @Override + public ServerMethodDefinition<?, ?> lookupMethod(String methodName, @Nullable String authority) { + // TODO (carl-mastrangelo): honor authority header. + return methods.get(methodName); + } + + void addService(BindableService bindableService, String key) { + ServerServiceDefinition service = bindableService.bindService(); + services.put(key, service); + for (ServerMethodDefinition<?, ?> method : service.getMethods()) { + methods.put(method.getMethodDescriptor().getFullMethodName(), method); + } + } + + void removeService(String serviceKey) { + ServerServiceDefinition service = services.remove(serviceKey); + for (ServerMethodDefinition<?, ?> method : service.getMethods()) { + methods.remove(method.getMethodDescriptor().getFullMethodName(), method); + } + } +} diff --git a/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocol.java b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocol.java new file mode 100644 index 0000000..5d78cf9 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcProtocol.java @@ -0,0 +1,200 @@ +package org.apache.dubbo.rpc.protocol.grpc;/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.rpc.Exporter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.protocol.AbstractProxyProtocol; + +import io.grpc.BindableService; +import io.grpc.Channel; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.dubbo.rpc.Constants.INTERFACES; + +/** + * + */ +public class GrpcProtocol extends AbstractProxyProtocol { + + private static final Logger logger = LoggerFactory.getLogger(GrpcProtocol.class); + + public final static int DEFAULT_PORT = 50051; + + private final Map<String, GrpcServer> serverMap = new ConcurrentHashMap<>(); + + private final Map<String, ManagedChannel> channelMap = new ConcurrentHashMap<>(); + + /** + * 传进来的impl implements DubboInterface, DubboInterface包含特定的3个通用方法就可以了 + * @param impl + * @param type + * @param url + * @param <T> + * @return + * @throws RpcException + */ + @Override + protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException { + String key = url.getAddress(); + GrpcServer grpcServer = serverMap.computeIfAbsent(key, k -> { + DubboHandlerRegistry registry = new DubboHandlerRegistry(); + Server originalServer = ServerBuilder + .forPort(url.getPort()) + .fallbackHandlerRegistry(registry) + .build(); + GrpcServer server = new GrpcServer(originalServer, registry); + return server; + }); + + grpcServer.getRegistry().addService((BindableService) impl, url.getServiceKey()); + + return () -> grpcServer.getRegistry().removeService(url.getServiceKey()); + } + + @Override + public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { + return super.export(new GrpcInvoker<>(invoker)); + } + + /** + * 这里返回的impl必须要有所有的方法, Stub BlockingStube FutureStub + * @param type + * @param url + * @param <T> + * @return + * @throws RpcException + */ + @Override + protected <T> T doRefer(Class<T> type, URL url) throws RpcException { + Class<?> enclosingClass = type.getEnclosingClass(); + + if (enclosingClass == null) { + throw new IllegalArgumentException(type.getName() + " must be declared inside protobuf generated classes, " + + "should be something like ServiceNameGrpc.IServiceName."); + } + + final Method dubboStubMethod; + try { + dubboStubMethod = enclosingClass.getDeclaredMethod("getDubboStub", Channel.class); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("Does not find getDubboStub in " + enclosingClass.getName() + ", please use the customized protoc-gen-grpc-dubbo-java to update the generated classes."); + } + + Channel channel = channelMap.computeIfAbsent(url.getServiceKey(), + k -> ManagedChannelBuilder.forAddress(url.getHost(), url.getPort()).usePlaintext(true).build() + ); + + try { + @SuppressWarnings("unchecked") + final T stub = (T) dubboStubMethod.invoke(null, channel); + return stub; + } catch (IllegalAccessException | InvocationTargetException e) { + throw new IllegalStateException("Could not create stub through reflection.", e); + } + } + + @Override + public int getDefaultPort() { + return DEFAULT_PORT; + } + + @Override + public void destroy() { + // FIXME + } + + private class GrpcServer { + private Server server; + private DubboHandlerRegistry registry; + + public GrpcServer(Server server, DubboHandlerRegistry registry) { + try { + server.start(); + } catch (IOException e) { + throw new IllegalStateException("Failed to start gRPC server.", e); + } + this.server = server; + this.registry = registry; + } + + public Server getServer() { + return server; + } + + public DubboHandlerRegistry getRegistry() { + return registry; + } + } + + private class GrpcInvoker<T> implements Invoker<T> { + + private Invoker<T> invoker; + + public GrpcInvoker(Invoker<T> invoker) { + this.invoker = invoker; + } + + @Override + public Class<T> getInterface() { + return invoker.getInterface(); + } + + @Override + public Result invoke(Invocation invocation) throws RpcException { + return invoker.invoke(invocation); + } + + @Override + public URL getUrl() { + URL url = invoker.getUrl(); + String interfaces = url.getParameter(INTERFACES); + if (StringUtils.isNotEmpty(interfaces)) { + interfaces += ("," + BindableService.class.getName()); + } else { + interfaces = BindableService.class.getName(); + } + return url.addParameter(INTERFACES, interfaces); + } + + @Override + public boolean isAvailable() { + return invoker.isAvailable(); + } + + @Override + public void destroy() { + invoker.destroy(); + } + } + +} diff --git a/dubbo-rpc/dubbo-rpc-grpc/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol b/dubbo-rpc/dubbo-rpc-grpc/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol new file mode 100644 index 0000000..f7acbba --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-grpc/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol @@ -0,0 +1 @@ +grpc=org.apache.dubbo.rpc.protocol.grpc.GrpcProtocol \ No newline at end of file diff --git a/dubbo-rpc/pom.xml b/dubbo-rpc/pom.xml index 1388b50..109338f 100644 --- a/dubbo-rpc/pom.xml +++ b/dubbo-rpc/pom.xml @@ -44,5 +44,6 @@ <module>dubbo-rpc-redis</module> <module>dubbo-rpc-rest</module> <module>dubbo-rpc-xml</module> + <module>dubbo-rpc-grpc</module> </modules> </project> diff --git a/pom.xml b/pom.xml index f3df034..e93d6eb 100644 --- a/pom.xml +++ b/pom.xml @@ -284,6 +284,18 @@ <build> <plugins> <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>${maven_source_version}</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> <artifactId>maven-javadoc-plugin</artifactId> <version>${maven_javadoc_version}</version> <executions> @@ -390,18 +402,6 @@ </resources> <plugins> <plugin> - <artifactId>maven-source-plugin</artifactId> - <version>${maven_source_version}</version> - <executions> - <execution> - <id>attach-sources</id> - <goals> - <goal>jar</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> <artifactId>maven-jar-plugin</artifactId> <version>${maven_jar_version}</version> <configuration>