This is an automated email from the ASF dual-hosted git repository.

pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new d256fc3  OAP internal RemoteService protocol change and code refactor 
(#3128)
d256fc3 is described below

commit d256fc348e727549cebf6d4f42ba88458ab771ab
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Sat Jul 20 23:47:17 2019 +0800

    OAP internal RemoteService protocol change and code refactor (#3128)
    
    * Remove the worker id, and add worker name for remote handler only.
    
    * Remote metrics and inventory classes mapping too.
    
    * Refactor codes.
---
 .../skywalking/oap/server/core/CoreModule.java     | 39 ++++++++----
 .../oap/server/core/CoreModuleProvider.java        | 74 ++++++++++++++++------
 .../core/analysis/worker/MetricsRemoteWorker.java  | 11 ++--
 .../analysis/worker/MetricsStreamProcessor.java    | 35 ++++++----
 .../core/analysis/worker/MetricsTransWorker.java   | 15 +++--
 .../register/worker/InventoryStreamProcessor.java  | 33 ++++++----
 .../register/worker/RegisterPersistentWorker.java  | 20 ++++--
 .../core/register/worker/RegisterRemoteWorker.java |  8 +--
 .../server/core/remote/RemoteSenderService.java    |  8 +--
 .../server/core/remote/RemoteServiceHandler.java   | 44 +++++++------
 .../core/remote/client/GRPCRemoteClient.java       | 39 ++++++------
 .../server/core/remote/client/RemoteClient.java    |  2 +-
 .../core/remote/client/RemoteClientManager.java    | 40 ++++++------
 .../core/remote/client/SelfRemoteClient.java       | 11 ++--
 .../core/remote/define/StreamDataMapping.java      | 72 ---------------------
 .../remote/define/StreamDataMappingGetter.java     | 32 ----------
 .../oap/server/core/worker/AbstractWorker.java     |  5 --
 .../server/core/worker/IWorkerInstanceGetter.java  |  3 +-
 .../server/core/worker/IWorkerInstanceSetter.java  |  4 +-
 .../RemoteHandleWorker.java}                       | 14 ++--
 .../server/core/worker/WorkerInstancesService.java | 31 +++++----
 .../server-core/src/main/proto/RemoteService.proto |  3 +-
 .../core/remote/RemoteServiceHandlerTestCase.java  | 41 ++++++------
 .../remote/client/GRPCRemoteClientRealClient.java  | 28 +++-----
 .../remote/client/GRPCRemoteClientRealServer.java  |  6 +-
 .../remote/client/GRPCRemoteClientTestCase.java    | 38 +++++------
 .../remote/client/RemoteClientManagerTestCase.java | 23 ++++---
 .../core/storage/StorageInstallerTestCase.java     |  3 -
 28 files changed, 341 insertions(+), 341 deletions(-)

diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index f31445c..4bfef5a 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -18,18 +18,37 @@
 
 package org.apache.skywalking.oap.server.core;
 
-import java.util.*;
-import org.apache.skywalking.oap.server.core.cache.*;
-import org.apache.skywalking.oap.server.core.config.*;
-import org.apache.skywalking.oap.server.core.query.*;
-import org.apache.skywalking.oap.server.core.register.service.*;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
+import 
org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
+import 
org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
+import 
org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
+import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
+import org.apache.skywalking.oap.server.core.query.LogQueryService;
+import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
+import org.apache.skywalking.oap.server.core.query.MetricQueryService;
+import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
+import org.apache.skywalking.oap.server.core.query.TraceQueryService;
+import 
org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
 import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
 import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
-import org.apache.skywalking.oap.server.core.remote.define.*;
-import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
 import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.core.storage.model.*;
-import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
+import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 
 /**
@@ -82,8 +101,6 @@ public class CoreModule extends ModuleDefine {
         classes.add(IModelSetter.class);
         classes.add(IModelGetter.class);
         classes.add(IModelOverride.class);
-        classes.add(StreamDataMappingGetter.class);
-        classes.add(StreamDataMappingSetter.class);
         classes.add(RemoteClientManager.class);
         classes.add(RemoteSenderService.class);
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 2c8f78a..f05b776 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -20,26 +20,65 @@ package org.apache.skywalking.oap.server.core;
 
 import java.io.IOException;
 import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
-import org.apache.skywalking.oap.server.core.analysis.*;
+import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
+import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
 import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
-import org.apache.skywalking.oap.server.core.cache.*;
-import org.apache.skywalking.oap.server.core.cluster.*;
-import org.apache.skywalking.oap.server.core.config.*;
+import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
+import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
+import 
org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
+import 
org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import 
org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
+import 
org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
 import org.apache.skywalking.oap.server.core.oal.rt.OALEngine;
 import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoader;
-import org.apache.skywalking.oap.server.core.query.*;
-import org.apache.skywalking.oap.server.core.register.service.*;
-import org.apache.skywalking.oap.server.core.remote.*;
-import org.apache.skywalking.oap.server.core.remote.client.*;
-import org.apache.skywalking.oap.server.core.remote.define.*;
+import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
+import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
+import org.apache.skywalking.oap.server.core.query.LogQueryService;
+import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
+import org.apache.skywalking.oap.server.core.query.MetricQueryService;
+import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
+import org.apache.skywalking.oap.server.core.query.TraceQueryService;
+import 
org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
 import 
org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
-import org.apache.skywalking.oap.server.core.server.*;
-import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
 import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
+import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
 import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
-import org.apache.skywalking.oap.server.core.worker.*;
-import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import 
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
 import org.apache.skywalking.oap.server.library.server.ServerException;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
 import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
@@ -56,7 +95,6 @@ public class CoreModuleProvider extends ModuleProvider {
     private RemoteClientManager remoteClientManager;
     private final AnnotationScan annotationScan;
     private final StorageModels storageModels;
-    private final StreamDataMapping streamDataMapping;
     private final SourceReceiverImpl receiver;
     private StreamAnnotationListener streamAnnotationListener;
     private OALEngine oalEngine;
@@ -65,7 +103,6 @@ public class CoreModuleProvider extends ModuleProvider {
         super();
         this.moduleConfig = new CoreModuleConfig();
         this.annotationScan = new AnnotationScan();
-        this.streamDataMapping = new StreamDataMapping();
         this.storageModels = new StorageModels();
         this.receiver = new SourceReceiverImpl();
     }
@@ -129,9 +166,6 @@ public class CoreModuleProvider extends ModuleProvider {
 
         this.registerServiceImplementation(SourceReceiver.class, receiver);
 
-        this.registerServiceImplementation(StreamDataMappingGetter.class, 
streamDataMapping);
-        this.registerServiceImplementation(StreamDataMappingSetter.class, 
streamDataMapping);
-
         WorkerInstancesService instancesService = new WorkerInstancesService();
         this.registerServiceImplementation(IWorkerInstanceGetter.class, 
instancesService);
         this.registerServiceImplementation(IWorkerInstanceSetter.class, 
instancesService);
@@ -178,8 +212,6 @@ public class CoreModuleProvider extends ModuleProvider {
             annotationScan.scan();
 
             oalEngine.notifyAllListeners();
-
-            streamDataMapping.init();
         } catch (IOException | IllegalAccessException | InstantiationException 
e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java
index 7ce9311..392d9b2 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java
@@ -33,21 +33,18 @@ public class MetricsRemoteWorker extends 
AbstractWorker<Metrics> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(MetricsRemoteWorker.class);
 
-    private final AbstractWorker<Metrics> nextWorker;
     private final RemoteSenderService remoteSender;
-    private final String modelName;
+    private final String remoteReceiverWorkerName;
 
-    MetricsRemoteWorker(ModuleDefineHolder moduleDefineHolder, 
AbstractWorker<Metrics> nextWorker,
-        String modelName) {
+    MetricsRemoteWorker(ModuleDefineHolder moduleDefineHolder, String 
remoteReceiverWorkerName) {
         super(moduleDefineHolder);
         this.remoteSender = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class);
-        this.nextWorker = nextWorker;
-        this.modelName = modelName;
+        this.remoteReceiverWorkerName = remoteReceiverWorkerName;
     }
 
     @Override public final void in(Metrics metrics) {
         try {
-            remoteSender.send(nextWorker.getWorkerId(), metrics, 
Selector.HashCode);
+            remoteSender.send(remoteReceiverWorkerName, metrics, 
Selector.HashCode);
         } catch (Throwable e) {
             logger.error(e.getMessage(), e);
         }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index dfd4b20..78d0398 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -18,16 +18,26 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import lombok.Getter;
-import org.apache.skywalking.oap.server.core.*;
-import org.apache.skywalking.oap.server.core.analysis.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
+import org.apache.skywalking.oap.server.core.analysis.Downsampling;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
 /**
@@ -67,9 +77,6 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
         IModelSetter modelSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
         DownsamplingConfigService configService = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
 
-        StreamDataMappingSetter streamDataMappingSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
-        streamDataMappingSetter.putIfAbsent(metricsClass);
-
         MetricsPersistentWorker hourPersistentWorker = null;
         MetricsPersistentWorker dayPersistentWorker = null;
         MetricsPersistentWorker monthPersistentWorker = null;
@@ -91,13 +98,19 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
         MetricsPersistentWorker minutePersistentWorker = 
minutePersistentWorker(moduleDefineHolder, metricsDAO, model);
 
         MetricsTransWorker transWorker = new 
MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, 
hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
-        MetricsRemoteWorker remoteWorker = new 
MetricsRemoteWorker(moduleDefineHolder, transWorker, stream.name());
+
+        String remoteReceiverWorkerName = stream.name() + "_rec";
+        IWorkerInstanceSetter workerInstanceSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
+        workerInstanceSetter.put(remoteReceiverWorkerName, transWorker, 
metricsClass);
+
+        MetricsRemoteWorker remoteWorker = new 
MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
         MetricsAggregateWorker aggregateWorker = new 
MetricsAggregateWorker(moduleDefineHolder, remoteWorker, stream.name());
 
         entryWorkers.put(metricsClass, aggregateWorker);
     }
 
-    private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder 
moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
+    private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder 
moduleDefineHolder,
+        IMetricsDAO metricsDAO, Model model) {
         AlarmNotifyWorker alarmNotifyWorker = new 
AlarmNotifyWorker(moduleDefineHolder);
         ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
index e4d2520..cade9b9 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
@@ -23,8 +23,11 @@ import 
org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
@@ -38,10 +41,10 @@ public class MetricsTransWorker extends 
AbstractWorker<Metrics> {
     private final MetricsPersistentWorker dayPersistenceWorker;
     private final MetricsPersistentWorker monthPersistenceWorker;
 
-    private CounterMetrics aggregationMinCounter;
-    private CounterMetrics aggregationHourCounter;
-    private CounterMetrics aggregationDayCounter;
-    private CounterMetrics aggregationMonthCounter;
+    private final CounterMetrics aggregationMinCounter;
+    private final CounterMetrics aggregationHourCounter;
+    private final CounterMetrics aggregationDayCounter;
+    private final CounterMetrics aggregationMonthCounter;
 
     public MetricsTransWorker(ModuleDefineHolder moduleDefineHolder, String 
modelName,
         MetricsPersistentWorker minutePersistenceWorker,
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
index 4f9f3f7..ed47c80 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
@@ -18,14 +18,23 @@
 
 package org.apache.skywalking.oap.server.core.register.worker;
 
-import java.util.*;
-import org.apache.skywalking.oap.server.core.*;
-import org.apache.skywalking.oap.server.core.analysis.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.Downsampling;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
 /**
@@ -46,7 +55,8 @@ public class InventoryStreamProcessor implements 
StreamProcessor<RegisterSource>
     }
 
     @SuppressWarnings("unchecked")
-    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, 
Class<? extends RegisterSource> inventoryClass) {
+    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
+        Class<? extends RegisterSource> inventoryClass) {
         StorageDAO storageDAO = 
moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
         IRegisterDAO registerDAO;
         try {
@@ -58,12 +68,13 @@ public class InventoryStreamProcessor implements 
StreamProcessor<RegisterSource>
         IModelSetter modelSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
         Model model = modelSetter.putIfAbsent(inventoryClass, 
stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), 
false);
 
-        StreamDataMappingSetter streamDataMappingSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
-        streamDataMappingSetter.putIfAbsent(inventoryClass);
-
         RegisterPersistentWorker persistentWorker = new 
RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, 
stream.scopeId());
 
-        RegisterRemoteWorker remoteWorker = new 
RegisterRemoteWorker(moduleDefineHolder, persistentWorker);
+        String remoteReceiverWorkerName = stream.name() + "_rec";
+        IWorkerInstanceSetter workerInstanceSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
+        workerInstanceSetter.put(remoteReceiverWorkerName, persistentWorker, 
inventoryClass);
+
+        RegisterRemoteWorker remoteWorker = new 
RegisterRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
 
         RegisterDistinctWorker distinctWorker = new 
RegisterDistinctWorker(moduleDefineHolder, remoteWorker);
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 5cd5079..2fb3101 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -18,17 +18,27 @@
 
 package org.apache.skywalking.oap.server.core.register.worker;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
-import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
+import 
org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
+import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
index 513a376..8366186 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
@@ -33,18 +33,18 @@ public class RegisterRemoteWorker extends 
AbstractWorker<RegisterSource> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(RegisterRemoteWorker.class);
 
-    private final AbstractWorker<RegisterSource> nextWorker;
+    private final String remoteReceiverWorkerName;
     private final RemoteSenderService remoteSender;
 
-    RegisterRemoteWorker(ModuleDefineHolder moduleDefineHolder, 
AbstractWorker<RegisterSource> nextWorker) {
+    RegisterRemoteWorker(ModuleDefineHolder moduleDefineHolder, String 
remoteReceiverWorkerName) {
         super(moduleDefineHolder);
         this.remoteSender = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class);
-        this.nextWorker = nextWorker;
+        this.remoteReceiverWorkerName = remoteReceiverWorkerName;
     }
 
     @Override public final void in(RegisterSource registerSource) {
         try {
-            remoteSender.send(nextWorker.getWorkerId(), registerSource, 
Selector.ForeverFirst);
+            remoteSender.send(remoteReceiverWorkerName, registerSource, 
Selector.ForeverFirst);
         } catch (Throwable e) {
             logger.error(e.getMessage(), e);
         }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
index a028546..436ecd2 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
@@ -41,22 +41,22 @@ public class RemoteSenderService implements Service {
         this.rollingSelector = new RollingSelector();
     }
 
-    public void send(int nextWorkId, StreamData streamData, Selector selector) 
{
+    public void send(String nextWorkName, StreamData streamData, Selector 
selector) {
         RemoteClientManager clientManager = 
moduleManager.find(CoreModule.NAME).provider().getService(RemoteClientManager.class);
 
         RemoteClient remoteClient;
         switch (selector) {
             case HashCode:
                 remoteClient = 
hashCodeSelector.select(clientManager.getRemoteClient(), streamData);
-                remoteClient.push(nextWorkId, streamData);
+                remoteClient.push(nextWorkName, streamData);
                 break;
             case Rolling:
                 remoteClient = 
rollingSelector.select(clientManager.getRemoteClient(), streamData);
-                remoteClient.push(nextWorkId, streamData);
+                remoteClient.push(nextWorkName, streamData);
                 break;
             case ForeverFirst:
                 remoteClient = 
foreverFirstSelector.select(clientManager.getRemoteClient(), streamData);
-                remoteClient.push(nextWorkId, streamData);
+                remoteClient.push(nextWorkName, streamData);
                 break;
         }
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
index 2fd5887..17d546e 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -22,14 +22,22 @@ import io.grpc.stub.StreamObserver;
 import java.util.Objects;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
+import 
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.RemoteHandleWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class is Server-side streaming RPC implementation. It's a common 
service for OAP servers to receive message from
@@ -43,10 +51,10 @@ public class RemoteServiceHandler extends 
RemoteServiceGrpc.RemoteServiceImplBas
     private static final Logger logger = 
LoggerFactory.getLogger(RemoteServiceHandler.class);
 
     private final ModuleDefineHolder moduleDefineHolder;
-    private StreamDataMappingGetter streamDataMappingGetter;
     private IWorkerInstanceGetter workerInstanceGetter;
     private CounterMetrics remoteInCounter;
     private CounterMetrics remoteInErrorCounter;
+    private CounterMetrics remoteInTargetNotFoundCounter;
     private HistogramMetrics remoteInHistogram;
 
     public RemoteServiceHandler(ModuleDefineHolder moduleDefineHolder) {
@@ -58,20 +66,15 @@ public class RemoteServiceHandler extends 
RemoteServiceGrpc.RemoteServiceImplBas
         remoteInErrorCounter = 
moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
             .createCounter("remote_in_error_count", "The error number(server 
side) of inside remote inside aggregate rpc.",
                 MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+        remoteInTargetNotFoundCounter = 
moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
+            .createCounter("remote_in_target_not_found_count", "The error 
number(server side) of inside remote handler target worker not found. May be 
caused by unmatched OAL scrips.",
+                MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
         remoteInHistogram = 
moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
             .createHistogramMetric("remote_in_latency", "The latency(server 
side) of inside remote inside aggregate rpc.",
                 MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
     }
 
     @Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> 
responseObserver) {
-        if (Objects.isNull(streamDataMappingGetter)) {
-            synchronized (RemoteServiceHandler.class) {
-                if (Objects.isNull(streamDataMappingGetter)) {
-                    streamDataMappingGetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class);
-                }
-            }
-        }
-
         if (Objects.isNull(workerInstanceGetter)) {
             synchronized (RemoteServiceHandler.class) {
                 if (Objects.isNull(workerInstanceGetter)) {
@@ -85,15 +88,20 @@ public class RemoteServiceHandler extends 
RemoteServiceGrpc.RemoteServiceImplBas
                 remoteInCounter.inc();
                 HistogramMetrics.Timer timer = remoteInHistogram.createTimer();
                 try {
-                    int streamDataId = message.getStreamDataId();
-                    int nextWorkerId = message.getNextWorkerId();
+                    String nextWorkerName = message.getNextWorkerName();
                     RemoteData remoteData = message.getRemoteData();
 
-                    Class<? extends StreamData> streamDataClass = 
streamDataMappingGetter.findClassById(streamDataId);
                     try {
-                        StreamData streamData = streamDataClass.newInstance();
+                        RemoteHandleWorker handleWorker = 
workerInstanceGetter.get(nextWorkerName);
+                        AbstractWorker nextWorker = handleWorker.getWorker();
+                        StreamData streamData = 
handleWorker.getStreamDataClass().newInstance();
                         streamData.deserialize(remoteData);
-                        workerInstanceGetter.get(nextWorkerId).in(streamData);
+                        if (nextWorker != null) {
+                            nextWorker.in(streamData);
+                        } else {
+                            remoteInTargetNotFoundCounter.inc();
+                            logger.warn("Work name [{}] not found. Check OAL 
script, make sure they are same in the whole cluster.", nextWorkerName);
+                        }
                     } catch (Throwable t) {
                         remoteInErrorCounter.inc();
                         logger.error(t.getMessage(), t);
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index 4927b97..fd80c9c 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -20,24 +20,29 @@ package org.apache.skywalking.oap.server.core.remote.client;
 
 import io.grpc.ManagedChannel;
 import io.grpc.stub.StreamObserver;
-import java.util.*;
+import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
+import 
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
 import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
-import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This is a wrapper of the gRPC client for sending message to each other OAP 
server.
- * It contains a block queue to buffering the message and sending the message 
by batch.
+ * This is a wrapper of the gRPC client for sending message to each other OAP 
server. It contains a block queue to
+ * buffering the message and sending the message by batch.
  *
  * @author peng-yongsheng
  */
@@ -48,7 +53,6 @@ public class GRPCRemoteClient implements RemoteClient {
     private final int channelSize;
     private final int bufferSize;
     private final Address address;
-    private final StreamDataMappingGetter streamDataMappingGetter;
     private final AtomicInteger concurrentStreamObserverNumber = new 
AtomicInteger(0);
     private GRPCClient client;
     private DataCarrier<RemoteMessage> carrier;
@@ -56,10 +60,8 @@ public class GRPCRemoteClient implements RemoteClient {
     private CounterMetrics remoteOutCounter;
     private CounterMetrics remoteOutErrorCounter;
 
-
-    public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, 
StreamDataMappingGetter streamDataMappingGetter, Address address, int 
channelSize,
+    public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, Address 
address, int channelSize,
         int bufferSize) {
-        this.streamDataMappingGetter = streamDataMappingGetter;
         this.address = address;
         this.channelSize = channelSize;
         this.bufferSize = bufferSize;
@@ -119,14 +121,12 @@ public class GRPCRemoteClient implements RemoteClient {
     /**
      * Push stream data which need to send to another OAP server.
      *
-     * @param nextWorkerId the id of a worker which will process this stream 
data.
+     * @param nextWorkerName the name of a worker which will process this 
stream data.
      * @param streamData the entity contains the values.
      */
-    @Override public void push(int nextWorkerId, StreamData streamData) {
-        int streamDataId = 
streamDataMappingGetter.findIdByClass(streamData.getClass());
+    @Override public void push(String nextWorkerName, StreamData streamData) {
         RemoteMessage.Builder builder = RemoteMessage.newBuilder();
-        builder.setNextWorkerId(nextWorkerId);
-        builder.setStreamDataId(streamDataId);
+        builder.setNextWorkerName(nextWorkerName);
         builder.setRemoteData(streamData.serialize());
 
         this.getDataCarrier().produce(builder.build());
@@ -159,9 +159,8 @@ public class GRPCRemoteClient implements RemoteClient {
     }
 
     /**
-     * Create a gRPC stream observer to sending stream data, one stream 
observer
-     * could send multiple stream data by a single consume.
-     * The max number of concurrency allowed at the same time is 10.
+     * Create a gRPC stream observer to sending stream data, one stream 
observer could send multiple stream data by a
+     * single consume. The max number of concurrency allowed at the same time 
is 10.
      *
      * @return stream observer
      */
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
index ccc216f..3330f3d 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
@@ -31,5 +31,5 @@ public interface RemoteClient extends 
Comparable<RemoteClient> {
 
     void close();
 
-    void push(int nextWorkerId, StreamData streamData);
+    void push(String nextWorkerName, StreamData streamData);
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 9bfd4d1..06a963f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -18,15 +18,28 @@
 
 package org.apache.skywalking.oap.server.core.remote.client;
 
-import java.util.*;
-import java.util.concurrent.*;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cluster.*;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
-import org.apache.skywalking.oap.server.library.module.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import org.apache.skywalking.oap.server.library.module.Service;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class manages the connections between OAP servers. There is a task 
schedule that will automatically query a
@@ -39,7 +52,6 @@ public class RemoteClientManager implements Service {
     private static final Logger logger = 
LoggerFactory.getLogger(RemoteClientManager.class);
 
     private final ModuleDefineHolder moduleDefineHolder;
-    private StreamDataMappingGetter streamDataMappingGetter;
     private ClusterNodesQuery clusterNodesQuery;
     private final List<RemoteClient> clientsA;
     private final List<RemoteClient> clientsB;
@@ -76,14 +88,6 @@ public class RemoteClientManager implements Service {
                 }
             }
 
-            if (Objects.isNull(streamDataMappingGetter)) {
-                synchronized (RemoteClientManager.class) {
-                    if (Objects.isNull(streamDataMappingGetter)) {
-                        this.streamDataMappingGetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class);
-                    }
-                }
-            }
-
             if (logger.isDebugEnabled()) {
                 logger.debug("Refresh remote nodes collection.");
             }
@@ -199,7 +203,7 @@ public class RemoteClientManager implements Service {
                         RemoteClient client = new 
SelfRemoteClient(moduleDefineHolder, address);
                         getFreeClients().add(client);
                     } else {
-                        RemoteClient client = new 
GRPCRemoteClient(moduleDefineHolder, streamDataMappingGetter, address, 1, 3000);
+                        RemoteClient client = new 
GRPCRemoteClient(moduleDefineHolder, address, 1, 3000);
                         client.connect();
                         getFreeClients().add(client);
                     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
index 67827a9..110d9e9 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
@@ -18,12 +18,15 @@
 
 package org.apache.skywalking.oap.server.core.remote.client;
 
-import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 
 /**
  * @author peng-yongsheng
@@ -53,8 +56,8 @@ public class SelfRemoteClient implements RemoteClient {
         throw new UnexpectedException("Self remote client invoked to close.");
     }
 
-    @Override public void push(int nextWorkerId, StreamData streamData) {
-        workerInstanceGetter.get(nextWorkerId).in(streamData);
+    @Override public void push(String nextWorkerName, StreamData streamData) {
+        workerInstanceGetter.get(nextWorkerName).getWorker().in(streamData);
     }
 
     @Override public int compareTo(RemoteClient o) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
deleted file mode 100644
index 00ef25b..0000000
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.remote.define;
-
-import java.util.*;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-
-/**
- * @author peng-yongsheng
- */
-public class StreamDataMapping implements StreamDataMappingGetter, 
StreamDataMappingSetter {
-    private List<Class<? extends StreamData>> streamClassList;
-    private final Map<Class<? extends StreamData>, Integer> classMap;
-    private final Map<Integer, Class<? extends StreamData>> idMap;
-
-    public StreamDataMapping() {
-        streamClassList = new ArrayList<>();
-        this.classMap = new HashMap<>();
-        this.idMap = new HashMap<>();
-    }
-
-    @Override public synchronized void putIfAbsent(Class<? extends StreamData> 
streamDataClass) {
-        if (classMap.containsKey(streamDataClass)) {
-            return;
-        }
-
-        streamClassList.add(streamDataClass);
-    }
-
-    public void init() {
-        /**
-         * The stream protocol use this list order to assign the ID,
-         * which is used in across node communication. This order must be 
certain.
-         */
-        Collections.sort(streamClassList, new Comparator<Class>() {
-            @Override public int compare(Class streamClass1, Class 
streamClass2) {
-                return 
streamClass1.getName().compareTo(streamClass2.getName());
-            }
-        });
-
-        for (int i = 0; i < streamClassList.size(); i++) {
-            Class<? extends StreamData> streamClass = streamClassList.get(i);
-            int streamId = i + 1;
-            classMap.put(streamClass, streamId);
-            idMap.put(streamId, streamClass);
-        }
-    }
-
-    @Override public int findIdByClass(Class<? extends StreamData> 
streamDataClass) {
-        return classMap.get(streamDataClass);
-    }
-
-    @Override public Class<? extends StreamData> findClassById(int id) {
-        return idMap.get(id);
-    }
-}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java
deleted file mode 100644
index fffebaf..0000000
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.remote.define;
-
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.library.module.Service;
-
-/**
- * @author peng-yongsheng
- */
-public interface StreamDataMappingGetter extends Service {
-
-    int findIdByClass(Class<? extends StreamData> streamDataClass);
-
-    Class<? extends StreamData> findClassById(int id);
-}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
index c68b690..b513d38 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
@@ -19,21 +19,16 @@
 package org.apache.skywalking.oap.server.core.worker;
 
 import lombok.Getter;
-import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
 /**
  * @author peng-yongsheng
  */
 public abstract class AbstractWorker<INPUT> {
-
-    @Getter private final int workerId;
     @Getter private final ModuleDefineHolder moduleDefineHolder;
 
     public AbstractWorker(ModuleDefineHolder moduleDefineHolder) {
         this.moduleDefineHolder = moduleDefineHolder;
-        IWorkerInstanceSetter workerInstanceSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
-        this.workerId = workerInstanceSetter.put(this);
     }
 
     public abstract void in(INPUT input);
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
index 9b54184..86f73f0 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
@@ -25,5 +25,6 @@ import 
org.apache.skywalking.oap.server.library.module.Service;
  */
 public interface IWorkerInstanceGetter extends Service {
 
-    AbstractWorker get(int workerId);
+    RemoteHandleWorker get(String nextWorkerName);
+
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
index eef279f..c6da429 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
@@ -18,12 +18,12 @@
 
 package org.apache.skywalking.oap.server.core.worker;
 
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.library.module.Service;
 
 /**
  * @author peng-yongsheng
  */
 public interface IWorkerInstanceSetter extends Service {
-
-    int put(AbstractWorker instance);
+    void put(String remoteReceiverWorkName, AbstractWorker instance, Class<? 
extends StreamData> streamDataClass);
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java
similarity index 75%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java
index dfff820..270c9d6 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java
@@ -16,14 +16,18 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.remote.define;
+package org.apache.skywalking.oap.server.core.worker;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.library.module.Service;
 
 /**
- * @author peng-yongsheng
+ * @author wusheng
  */
-public interface StreamDataMappingSetter extends Service {
-    void putIfAbsent(Class<? extends StreamData> streamDataClass);
+@AllArgsConstructor
+@Getter
+public class RemoteHandleWorker {
+    private AbstractWorker worker;
+    private Class<? extends StreamData> streamDataClass;
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
index 51d671a..0852a6a 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
@@ -18,28 +18,37 @@
 
 package org.apache.skywalking.oap.server.core.worker;
 
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * @author peng-yongsheng
+ * Worker Instance Service hosts all remote handler workers with the stream 
data type.
+ *
+ * @author peng-yongsheng, wusheng
  */
 public class WorkerInstancesService implements IWorkerInstanceSetter, 
IWorkerInstanceGetter {
+    private static final Logger logger = 
LoggerFactory.getLogger(WorkerInstancesService.class);
 
-    private final AtomicInteger generator = new AtomicInteger(1);
-    private final Map<Integer, AbstractWorker> instances;
+    private final Map<String, RemoteHandleWorker> instances;
 
     public WorkerInstancesService() {
         this.instances = new HashMap<>();
     }
 
-    @Override public AbstractWorker get(int workerId) {
-        return instances.get(workerId);
+    @Override public RemoteHandleWorker get(String nextWorkerName) {
+        return instances.get(nextWorkerName);
     }
 
-    @Override public int put(AbstractWorker instance) {
-        int workerId = generator.getAndIncrement();
-        instances.put(workerId, instance);
-        return workerId;
+    @Override public void put(String remoteReceiverWorkName, AbstractWorker 
instance,
+        Class<? extends StreamData> streamDataClass) {
+        if (instances.containsKey(remoteReceiverWorkName)) {
+            throw new UnexpectedException("Duplicate worker name:" + 
remoteReceiverWorkName);
+        }
+        instances.put(remoteReceiverWorkName, new RemoteHandleWorker(instance, 
streamDataClass));
+        logger.debug("Worker {} has been registered as {}", 
instance.toString(), remoteReceiverWorkName);
     }
 }
diff --git a/oap-server/server-core/src/main/proto/RemoteService.proto 
b/oap-server/server-core/src/main/proto/RemoteService.proto
index 853ea82..73ed7a6 100644
--- a/oap-server/server-core/src/main/proto/RemoteService.proto
+++ b/oap-server/server-core/src/main/proto/RemoteService.proto
@@ -27,8 +27,7 @@ service RemoteService {
 }
 
 message RemoteMessage {
-    int32 nextWorkerId = 1;
-    int32 streamDataId = 2;
+    string nextWorkerName = 1;
     RemoteData remoteData = 3;
 }
 
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
index b9081d1..50d1336 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
@@ -18,22 +18,34 @@
 
 package org.apache.skywalking.oap.server.core.remote;
 
-import io.grpc.inprocess.*;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
 import io.grpc.stub.StreamObserver;
 import io.grpc.testing.GrpcCleanupRule;
 import java.io.IOException;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
+import 
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.*;
+import 
org.apache.skywalking.oap.server.library.module.DuplicateProviderException;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import 
org.apache.skywalking.oap.server.library.module.ProviderNotFoundException;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
-import org.junit.*;
-
-import static org.mockito.Mockito.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * @author peng-yongsheng
@@ -46,18 +58,12 @@ public class RemoteServiceHandlerTestCase {
     @Test
     public void callTest() throws DuplicateProviderException, 
ProviderNotFoundException, IOException {
         final int streamDataClassId = 1;
-        final int testWorkerId = 1;
+        final String testWorkerId = "mock-worker";
 
         ModuleManagerTesting moduleManager = new ModuleManagerTesting();
         ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
         moduleManager.put(CoreModule.NAME, moduleDefine);
 
-        StreamDataMappingGetter classGetter = 
mock(StreamDataMappingGetter.class);
-        Class dataClass = TestRemoteData.class;
-        
when(classGetter.findClassById(streamDataClassId)).thenReturn(dataClass);
-
-        
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
 classGetter);
-
         String serverName = InProcessServerBuilder.generateName();
         MetricsCreator metricsCreator = mock(MetricsCreator.class);
         when(metricsCreator.createCounter(any(), any(), any(), 
any())).thenReturn(new CounterMetrics() {
@@ -101,8 +107,7 @@ public class RemoteServiceHandlerTestCase {
         });
 
         RemoteMessage.Builder remoteMessage = RemoteMessage.newBuilder();
-        remoteMessage.setStreamDataId(streamDataClassId);
-        remoteMessage.setNextWorkerId(testWorkerId);
+        remoteMessage.setNextWorkerName(testWorkerId);
 
         RemoteData.Builder remoteData = RemoteData.newBuilder();
         remoteData.addDataStrings("test1");
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
index be34824..7f506f9 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
@@ -19,17 +19,21 @@
 package org.apache.skywalking.oap.server.core.remote.client;
 
 import java.util.concurrent.TimeUnit;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
 import org.junit.Assert;
 
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 /**
  * @author peng-yongsheng
@@ -53,29 +57,17 @@ public class GRPCRemoteClientRealClient {
         moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
         
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class,
 metricsCreator);
 
-        GRPCRemoteClient remoteClient = spy(new 
GRPCRemoteClient(moduleManager, new TestMappingGetter(), address, 1, 10));
+        GRPCRemoteClient remoteClient = spy(new 
GRPCRemoteClient(moduleManager, address, 1, 10));
         remoteClient.connect();
 
         for (int i = 0; i < 10000; i++) {
-            remoteClient.push(1, new TestStreamData());
+            remoteClient.push("mock_remote", new TestStreamData());
             TimeUnit.SECONDS.sleep(1);
         }
 
         TimeUnit.MINUTES.sleep(10);
     }
 
-    public static class TestMappingGetter implements StreamDataMappingGetter {
-
-        @Override public int findIdByClass(Class streamDataClass) {
-            return 1;
-        }
-
-        @Override public Class<StreamData> findClassById(int id) {
-            Class<?> clazz = TestStreamData.class;
-            return (Class<StreamData>)clazz;
-        }
-    }
-
     public static class TestStreamData extends StreamData {
 
         private long value;
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
index 51945f2..a0c2113 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
@@ -21,10 +21,10 @@ package org.apache.skywalking.oap.server.core.remote.client;
 import java.util.concurrent.TimeUnit;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
 import org.apache.skywalking.oap.server.library.server.ServerException;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
-import org.apache.skywalking.oap.server.testing.module.*;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
 
 /**
  * @author peng-yongsheng
@@ -36,8 +36,6 @@ public class GRPCRemoteClientRealServer {
         ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
         moduleManager.put(CoreModule.NAME, moduleDefine);
 
-        
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
 new GRPCRemoteClientRealClient.TestMappingGetter());
-
         GRPCServer server = new GRPCServer("localhost", 10000);
         server.initialize();
 
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
index 8a6494f..3513ca3 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
@@ -22,26 +22,36 @@ import io.grpc.testing.GrpcServerRule;
 import java.util.concurrent.TimeUnit;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
-import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
-import org.junit.*;
-
-import static org.mockito.Mockito.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 /**
  * @author peng-yongsheng
  */
 public class GRPCRemoteClientTestCase {
 
-    private final int nextWorkerId = 1;
+    private final String nextWorkerId = "mock-worker";
     private ModuleManagerTesting moduleManager;
-    private StreamDataMappingGetter classGetter;
     @Rule public final GrpcServerRule grpcServerRule = new 
GrpcServerRule().directExecutor();
 
     @Before
@@ -50,9 +60,6 @@ public class GRPCRemoteClientTestCase {
         ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
         moduleManager.put(CoreModule.NAME, moduleDefine);
 
-        classGetter = mock(StreamDataMappingGetter.class);
-        
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
 classGetter);
-
         WorkerInstancesService workerInstancesService = new 
WorkerInstancesService();
         
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceGetter.class,
 workerInstancesService);
         
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class,
 workerInstancesService);
@@ -79,16 +86,11 @@ public class GRPCRemoteClientTestCase {
         grpcServerRule.getServiceRegistry().addService(new 
RemoteServiceHandler(moduleManager));
 
         Address address = new Address("not-important", 11, false);
-        GRPCRemoteClient remoteClient = spy(new 
GRPCRemoteClient(moduleManager, classGetter, address, 1, 10));
+        GRPCRemoteClient remoteClient = spy(new 
GRPCRemoteClient(moduleManager, address, 1, 10));
         remoteClient.connect();
 
         doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel();
 
-        when(classGetter.findIdByClass(TestStreamData.class)).thenReturn(1);
-
-        Class dataClass = TestStreamData.class;
-        when(classGetter.findClassById(1)).thenReturn(dataClass);
-
         for (int i = 0; i < 12; i++) {
             remoteClient.push(nextWorkerId, new TestStreamData());
         }
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
index 2741af2..e83aa63 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
@@ -18,16 +18,23 @@
 
 package org.apache.skywalking.oap.server.core.remote.client;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cluster.*;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
-import org.junit.*;
+import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
+import org.junit.Assert;
+import org.junit.Test;
 
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * @author peng-yongsheng
@@ -46,8 +53,6 @@ public class RemoteClientManagerTestCase {
         ClusterNodesQuery clusterNodesQuery = mock(ClusterNodesQuery.class);
         
clusterModuleDefine.provider().registerServiceImplementation(ClusterNodesQuery.class,
 clusterNodesQuery);
 
-        StreamDataMappingGetter streamDataMappingGetter = 
mock(StreamDataMappingGetter.class);
-        
coreModuleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
 streamDataMappingGetter);
 
         MetricsCreator metricsCreator = mock(MetricsCreator.class);
         when(metricsCreator.createGauge(any(), any(), any(), 
any())).thenReturn(new GaugeMetrics() {
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
index 1baa2c0..6cfcd37 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.core.storage;
 
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.CoreModuleProvider;
-import org.apache.skywalking.oap.server.core.remote.define.StreamDataMapping;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
 import org.apache.skywalking.oap.server.library.client.Client;
@@ -37,7 +36,6 @@ public class StorageInstallerTestCase {
 
     @Test
     public void testInstall() throws StorageException, 
ServiceNotProvidedException {
-        StreamDataMapping streamDataMapping = new StreamDataMapping();
         CoreModuleProvider moduleProvider = 
Mockito.mock(CoreModuleProvider.class);
         CoreModule moduleDefine = Mockito.spy(CoreModule.class);
         ModuleManager moduleManager = Mockito.mock(ModuleManager.class);
@@ -45,7 +43,6 @@ public class StorageInstallerTestCase {
         Whitebox.setInternalState(moduleDefine, "loadedProvider", 
moduleProvider);
 
         
Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleDefine);
-        
Mockito.when(moduleProvider.getService(StreamDataMapping.class)).thenReturn(streamDataMapping);
 
 //        streamDataMapping.generate();
 

Reply via email to