This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch boot-seq
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit b595eafe462433d5f9eb4a675562e04f413a3c5b
Author: Wu Sheng <[email protected]>
AuthorDate: Mon Sep 16 15:03:42 2019 +0800

    Make sure the cluster register happens before streaming process.
---
 .../oap/server/core/CoreModuleProvider.java        | 11 +++++----
 .../server/core/remote/RemoteSenderService.java    | 27 +++++++++++++++++-----
 2 files changed, 27 insertions(+), 11 deletions(-)

diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index a188bef..86c611f 100755
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -185,6 +185,12 @@ public class CoreModuleProvider extends ModuleProvider {
         } catch (IOException | IllegalAccessException | InstantiationException 
e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
+
+        if 
(CoreModuleConfig.Role.Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || 
CoreModuleConfig.Role.Aggregator.name().equalsIgnoreCase(moduleConfig.getRole()))
 {
+            RemoteInstance gRPCServerInstance = new RemoteInstance(new 
Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
+            
this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
+        }
+
     }
 
     @Override public void notifyAfterCompleted() throws ModuleStartException {
@@ -195,11 +201,6 @@ public class CoreModuleProvider extends ModuleProvider {
             throw new ModuleStartException(e.getMessage(), e);
         }
 
-        if 
(CoreModuleConfig.Role.Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || 
CoreModuleConfig.Role.Aggregator.name().equalsIgnoreCase(moduleConfig.getRole()))
 {
-            RemoteInstance gRPCServerInstance = new RemoteInstance(new 
Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
-            
this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
-        }
-
         PersistenceTimer.INSTANCE.start(getManager(), moduleConfig);
 
         if (moduleConfig.isEnableDataKeeperExecutor()) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
index 436ecd2..61fd9d1 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
@@ -18,16 +18,25 @@
 
 package org.apache.skywalking.oap.server.core.remote;
 
+import java.util.List;
 import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.remote.client.*;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.core.remote.selector.*;
-import org.apache.skywalking.oap.server.library.module.*;
+import 
org.apache.skywalking.oap.server.core.remote.selector.ForeverFirstSelector;
+import org.apache.skywalking.oap.server.core.remote.selector.HashCodeSelector;
+import org.apache.skywalking.oap.server.core.remote.selector.RollingSelector;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
  */
 public class RemoteSenderService implements Service {
+    private static final Logger logger = 
LoggerFactory.getLogger(RemoteSenderService.class);
 
     private final ModuleManager moduleManager;
     private final HashCodeSelector hashCodeSelector;
@@ -44,18 +53,24 @@ public class RemoteSenderService implements Service {
     public void send(String nextWorkName, StreamData streamData, Selector 
selector) {
         RemoteClientManager clientManager = 
moduleManager.find(CoreModule.NAME).provider().getService(RemoteClientManager.class);
 
+        List<RemoteClient> clientList = clientManager.getRemoteClient();
+        if (clientList.size() == 0) {
+            logger.warn("There is no available remote server for now, ignore 
the streaming data until the cluster metadata initialized.");
+            return;
+        }
+
         RemoteClient remoteClient;
         switch (selector) {
             case HashCode:
-                remoteClient = 
hashCodeSelector.select(clientManager.getRemoteClient(), streamData);
+                remoteClient = hashCodeSelector.select(clientList, streamData);
                 remoteClient.push(nextWorkName, streamData);
                 break;
             case Rolling:
-                remoteClient = 
rollingSelector.select(clientManager.getRemoteClient(), streamData);
+                remoteClient = rollingSelector.select(clientList, streamData);
                 remoteClient.push(nextWorkName, streamData);
                 break;
             case ForeverFirst:
-                remoteClient = 
foreverFirstSelector.select(clientManager.getRemoteClient(), streamData);
+                remoteClient = foreverFirstSelector.select(clientList, 
streamData);
                 remoteClient.push(nextWorkName, streamData);
                 break;
         }

Reply via email to