This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch boot-seq in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit b595eafe462433d5f9eb4a675562e04f413a3c5b Author: Wu Sheng <[email protected]> AuthorDate: Mon Sep 16 15:03:42 2019 +0800 Make sure the cluster register happens before streaming process. --- .../oap/server/core/CoreModuleProvider.java | 11 +++++---- .../server/core/remote/RemoteSenderService.java | 27 +++++++++++++++++----- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index a188bef..86c611f 100755 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -185,6 +185,12 @@ public class CoreModuleProvider extends ModuleProvider { } catch (IOException | IllegalAccessException | InstantiationException e) { throw new ModuleStartException(e.getMessage(), e); } + + if (CoreModuleConfig.Role.Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equalsIgnoreCase(moduleConfig.getRole())) { + RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true)); + this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance); + } + } @Override public void notifyAfterCompleted() throws ModuleStartException { @@ -195,11 +201,6 @@ public class CoreModuleProvider extends ModuleProvider { throw new ModuleStartException(e.getMessage(), e); } - if (CoreModuleConfig.Role.Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equalsIgnoreCase(moduleConfig.getRole())) { - RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true)); - this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance); - } - PersistenceTimer.INSTANCE.start(getManager(), moduleConfig); if (moduleConfig.isEnableDataKeeperExecutor()) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java index 436ecd2..61fd9d1 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java @@ -18,16 +18,25 @@ package org.apache.skywalking.oap.server.core.remote; +import java.util.List; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.remote.client.*; +import org.apache.skywalking.oap.server.core.remote.client.RemoteClient; +import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager; import org.apache.skywalking.oap.server.core.remote.data.StreamData; -import org.apache.skywalking.oap.server.core.remote.selector.*; -import org.apache.skywalking.oap.server.library.module.*; +import org.apache.skywalking.oap.server.core.remote.selector.ForeverFirstSelector; +import org.apache.skywalking.oap.server.core.remote.selector.HashCodeSelector; +import org.apache.skywalking.oap.server.core.remote.selector.RollingSelector; +import org.apache.skywalking.oap.server.core.remote.selector.Selector; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.Service; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ public class RemoteSenderService implements Service { + private static final Logger logger = LoggerFactory.getLogger(RemoteSenderService.class); private final ModuleManager moduleManager; private final HashCodeSelector hashCodeSelector; @@ -44,18 +53,24 @@ public class RemoteSenderService implements Service { public void send(String nextWorkName, StreamData streamData, Selector selector) { RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).provider().getService(RemoteClientManager.class); + List<RemoteClient> clientList = clientManager.getRemoteClient(); + if (clientList.size() == 0) { + logger.warn("There is no available remote server for now, ignore the streaming data until the cluster metadata initialized."); + return; + } + RemoteClient remoteClient; switch (selector) { case HashCode: - remoteClient = hashCodeSelector.select(clientManager.getRemoteClient(), streamData); + remoteClient = hashCodeSelector.select(clientList, streamData); remoteClient.push(nextWorkName, streamData); break; case Rolling: - remoteClient = rollingSelector.select(clientManager.getRemoteClient(), streamData); + remoteClient = rollingSelector.select(clientList, streamData); remoteClient.push(nextWorkName, streamData); break; case ForeverFirst: - remoteClient = foreverFirstSelector.select(clientManager.getRemoteClient(), streamData); + remoteClient = foreverFirstSelector.select(clientList, streamData); remoteClient.push(nextWorkName, streamData); break; }
