This is an automated email from the ASF dual-hosted git repository.
tanjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 22bf8e6 fix auth in sharing server. (#5411)
22bf8e6 is described below
commit 22bf8e6a61e0233ca5a01fe7f82451e08973df76
Author: Jared Tan <[email protected]>
AuthorDate: Sun Aug 30 08:20:53 2020 +0800
fix auth in sharing server. (#5411)
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
.../core/server/GRPCHandlerRegisterImpl.java | 2 +-
.../oap/server/library/server/grpc/GRPCServer.java | 33 ++++++++++++++--------
.../server/SharingServerModuleProvider.java | 15 ++++++++--
3 files changed, 34 insertions(+), 16 deletions(-)
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java
index 2080701..f75f355 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java
@@ -43,6 +43,6 @@ public class GRPCHandlerRegisterImpl implements
GRPCHandlerRegister {
@Override
public void addFilter(ServerInterceptor interceptor) {
-
+ server.addHandler(interceptor);
}
}
diff --git
a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java
b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java
index d2eb9ac..2c0dd6c 100644
---
a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java
+++
b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.library.server.grpc;
import io.grpc.BindableService;
+import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
@@ -33,15 +34,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.server.Server;
import org.apache.skywalking.oap.server.library.server.ServerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@Slf4j
public class GRPCServer implements Server {
- private static final Logger LOGGER =
LoggerFactory.getLogger(GRPCServer.class);
-
private final String host;
private final int port;
private int maxConcurrentCallsPerConnection;
@@ -104,19 +103,22 @@ public class GRPCServer implements Server {
public void initialize() {
InetSocketAddress address = new InetSocketAddress(host, port);
ArrayBlockingQueue blockingQueue = new
ArrayBlockingQueue(threadPoolQueueSize);
- ExecutorService executor = new ThreadPoolExecutor(threadPoolSize,
threadPoolSize, 60, TimeUnit.SECONDS, blockingQueue, new
CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler());
+ ExecutorService executor = new ThreadPoolExecutor(
+ threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS,
blockingQueue,
+ new CustomThreadFactory("grpcServerPool"), new
CustomRejectedExecutionHandler()
+ );
nettyServerBuilder = NettyServerBuilder.forAddress(address);
nettyServerBuilder =
nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection)
.maxInboundMessageSize(maxMessageSize)
.executor(executor);
- LOGGER.info("Server started, host {} listening on {}", host, port);
+ log.info("Server started, host {} listening on {}", host, port);
}
static class CustomRejectedExecutionHandler implements
RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
- LOGGER.warn("Grpc server thread pool is full, rejecting the task");
+ log.warn("Grpc server thread pool is full, rejecting the task");
}
}
@@ -124,8 +126,9 @@ public class GRPCServer implements Server {
public void start() throws ServerException {
try {
if (sslContextBuilder != null) {
- nettyServerBuilder =
nettyServerBuilder.sslContext(GrpcSslContexts.configure(sslContextBuilder,
SslProvider.OPENSSL)
-
.build());
+ nettyServerBuilder = nettyServerBuilder.sslContext(
+ GrpcSslContexts.configure(sslContextBuilder,
SslProvider.OPENSSL)
+ .build());
}
server = nettyServerBuilder.build();
server.start();
@@ -135,15 +138,20 @@ public class GRPCServer implements Server {
}
public void addHandler(BindableService handler) {
- LOGGER.info("Bind handler {} into gRPC server {}:{}",
handler.getClass().getSimpleName(), host, port);
+ log.info("Bind handler {} into gRPC server {}:{}",
handler.getClass().getSimpleName(), host, port);
nettyServerBuilder.addService(handler);
}
public void addHandler(ServerServiceDefinition definition) {
- LOGGER.info("Bind handler {} into gRPC server {}:{}",
definition.getClass().getSimpleName(), host, port);
+ log.info("Bind handler {} into gRPC server {}:{}",
definition.getClass().getSimpleName(), host, port);
nettyServerBuilder.addService(definition);
}
+ public void addHandler(ServerInterceptor serverInterceptor) {
+ log.info("Bind interceptor {} into gRPC server {}:{}",
serverInterceptor.getClass().getSimpleName(), host, port);
+ nettyServerBuilder.intercept(serverInterceptor);
+ }
+
@Override
public boolean isSSLOpen() {
return sslContextBuilder == null;
@@ -156,7 +164,8 @@ public class GRPCServer implements Server {
if (target == null || getClass() != target.getClass())
return false;
GRPCServer that = (GRPCServer) target;
- return port == that.port && Objects.equals(host, that.host) &&
Objects.equals(certChainFile, that.certChainFile) && Objects
+ return port == that.port && Objects.equals(host, that.host) &&
Objects.equals(
+ certChainFile, that.certChainFile) && Objects
.equals(privateKeyFile, that.privateKeyFile);
}
}
diff --git
a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java
b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java
index 5091b7b..7c3fabf 100644
---
a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java
+++
b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java
@@ -45,6 +45,7 @@ public class SharingServerModuleProvider extends
ModuleProvider {
private JettyServer jettyServer;
private ReceiverGRPCHandlerRegister receiverGRPCHandlerRegister;
private ReceiverJettyHandlerRegister receiverJettyHandlerRegister;
+ private AuthenticationInterceptor authenticationInterceptor;
public SharingServerModuleProvider() {
super();
@@ -92,6 +93,10 @@ public class SharingServerModuleProvider extends
ModuleProvider {
this.registerServiceImplementation(JettyHandlerRegister.class,
receiverJettyHandlerRegister);
}
+ if (StringUtil.isNotEmpty(config.getAuthentication())) {
+ authenticationInterceptor = new
AuthenticationInterceptor(config.getAuthentication());
+ }
+
if (config.getGRPCPort() != 0) {
if (config.isGRPCSslEnabled()) {
grpcServer = new GRPCServer(
@@ -120,11 +125,15 @@ public class SharingServerModuleProvider extends
ModuleProvider {
}
grpcServer.initialize();
- this.registerServiceImplementation(GRPCHandlerRegister.class, new
GRPCHandlerRegisterImpl(grpcServer));
+ GRPCHandlerRegisterImpl grpcHandlerRegister = new
GRPCHandlerRegisterImpl(grpcServer);
+ if (Objects.nonNull(authenticationInterceptor)) {
+ grpcHandlerRegister.addFilter(authenticationInterceptor);
+ }
+ this.registerServiceImplementation(GRPCHandlerRegister.class,
grpcHandlerRegister);
} else {
this.receiverGRPCHandlerRegister = new
ReceiverGRPCHandlerRegister();
- if (StringUtil.isNotEmpty(config.getAuthentication())) {
- receiverGRPCHandlerRegister.addFilter(new
AuthenticationInterceptor(config.getAuthentication()));
+ if (Objects.nonNull(authenticationInterceptor)) {
+
receiverGRPCHandlerRegister.addFilter(authenticationInterceptor);
}
this.registerServiceImplementation(GRPCHandlerRegister.class,
receiverGRPCHandlerRegister);
}