This is an automated email from the ASF dual-hosted git repository. tanjian pushed a commit to branch auth_fix in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit ca2adf3e12abc6e5b2cb657e61e9219f0d7306fb Author: JaredTan95 <[email protected]> AuthorDate: Sat Aug 29 21:35:09 2020 +0800 fix auth in sharing server. --- .../core/server/GRPCHandlerRegisterImpl.java | 2 +- .../oap/server/library/server/grpc/GRPCServer.java | 33 ++++++++++++++-------- .../server/SharingServerModuleProvider.java | 15 ++++++++-- 3 files changed, 34 insertions(+), 16 deletions(-) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java index 2080701..f75f355 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java @@ -43,6 +43,6 @@ public class GRPCHandlerRegisterImpl implements GRPCHandlerRegister { @Override public void addFilter(ServerInterceptor interceptor) { - + server.addHandler(interceptor); } } diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java index d2eb9ac..2c0dd6c 100644 --- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java +++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.library.server.grpc; import io.grpc.BindableService; +import io.grpc.ServerInterceptor; import io.grpc.ServerServiceDefinition; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NettyServerBuilder; @@ -33,15 +34,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.library.server.Server; import org.apache.skywalking.oap.server.library.server.ServerException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +@Slf4j public class GRPCServer implements Server { - private static final Logger LOGGER = LoggerFactory.getLogger(GRPCServer.class); - private final String host; private final int port; private int maxConcurrentCallsPerConnection; @@ -104,19 +103,22 @@ public class GRPCServer implements Server { public void initialize() { InetSocketAddress address = new InetSocketAddress(host, port); ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(threadPoolQueueSize); - ExecutorService executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, blockingQueue, new CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler()); + ExecutorService executor = new ThreadPoolExecutor( + threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, blockingQueue, + new CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler() + ); nettyServerBuilder = NettyServerBuilder.forAddress(address); nettyServerBuilder = nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection) .maxInboundMessageSize(maxMessageSize) .executor(executor); - LOGGER.info("Server started, host {} listening on {}", host, port); + log.info("Server started, host {} listening on {}", host, port); } static class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - LOGGER.warn("Grpc server thread pool is full, rejecting the task"); + log.warn("Grpc server thread pool is full, rejecting the task"); } } @@ -124,8 +126,9 @@ public class GRPCServer implements Server { public void start() throws ServerException { try { if (sslContextBuilder != null) { - nettyServerBuilder = nettyServerBuilder.sslContext(GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL) - .build()); + nettyServerBuilder = nettyServerBuilder.sslContext( + GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL) + .build()); } server = nettyServerBuilder.build(); server.start(); @@ -135,15 +138,20 @@ public class GRPCServer implements Server { } public void addHandler(BindableService handler) { - LOGGER.info("Bind handler {} into gRPC server {}:{}", handler.getClass().getSimpleName(), host, port); + log.info("Bind handler {} into gRPC server {}:{}", handler.getClass().getSimpleName(), host, port); nettyServerBuilder.addService(handler); } public void addHandler(ServerServiceDefinition definition) { - LOGGER.info("Bind handler {} into gRPC server {}:{}", definition.getClass().getSimpleName(), host, port); + log.info("Bind handler {} into gRPC server {}:{}", definition.getClass().getSimpleName(), host, port); nettyServerBuilder.addService(definition); } + public void addHandler(ServerInterceptor serverInterceptor) { + log.info("Bind interceptor {} into gRPC server {}:{}", serverInterceptor.getClass().getSimpleName(), host, port); + nettyServerBuilder.intercept(serverInterceptor); + } + @Override public boolean isSSLOpen() { return sslContextBuilder == null; @@ -156,7 +164,8 @@ public class GRPCServer implements Server { if (target == null || getClass() != target.getClass()) return false; GRPCServer that = (GRPCServer) target; - return port == that.port && Objects.equals(host, that.host) && Objects.equals(certChainFile, that.certChainFile) && Objects + return port == that.port && Objects.equals(host, that.host) && Objects.equals( + certChainFile, that.certChainFile) && Objects .equals(privateKeyFile, that.privateKeyFile); } } diff --git a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java index 5091b7b..7c3fabf 100644 --- a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java @@ -45,6 +45,7 @@ public class SharingServerModuleProvider extends ModuleProvider { private JettyServer jettyServer; private ReceiverGRPCHandlerRegister receiverGRPCHandlerRegister; private ReceiverJettyHandlerRegister receiverJettyHandlerRegister; + private AuthenticationInterceptor authenticationInterceptor; public SharingServerModuleProvider() { super(); @@ -92,6 +93,10 @@ public class SharingServerModuleProvider extends ModuleProvider { this.registerServiceImplementation(JettyHandlerRegister.class, receiverJettyHandlerRegister); } + if (StringUtil.isNotEmpty(config.getAuthentication())) { + authenticationInterceptor = new AuthenticationInterceptor(config.getAuthentication()); + } + if (config.getGRPCPort() != 0) { if (config.isGRPCSslEnabled()) { grpcServer = new GRPCServer( @@ -120,11 +125,15 @@ public class SharingServerModuleProvider extends ModuleProvider { } grpcServer.initialize(); - this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer)); + GRPCHandlerRegisterImpl grpcHandlerRegister = new GRPCHandlerRegisterImpl(grpcServer); + if (Objects.nonNull(authenticationInterceptor)) { + grpcHandlerRegister.addFilter(authenticationInterceptor); + } + this.registerServiceImplementation(GRPCHandlerRegister.class, grpcHandlerRegister); } else { this.receiverGRPCHandlerRegister = new ReceiverGRPCHandlerRegister(); - if (StringUtil.isNotEmpty(config.getAuthentication())) { - receiverGRPCHandlerRegister.addFilter(new AuthenticationInterceptor(config.getAuthentication())); + if (Objects.nonNull(authenticationInterceptor)) { + receiverGRPCHandlerRegister.addFilter(authenticationInterceptor); } this.registerServiceImplementation(GRPCHandlerRegister.class, receiverGRPCHandlerRegister); }
