This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch workid2name
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/workid2name by this push:
     new e8caf4b  Remote metrics and inventory classes mapping too.
e8caf4b is described below

commit e8caf4bcebd62f719cadb9b4cc347501d72acc10
Author: Wu Sheng <[email protected]>
AuthorDate: Sat Jul 20 21:05:20 2019 +0800

    Remote metrics and inventory classes mapping too.
---
 .../skywalking/oap/server/core/CoreModule.java     | 39 ++++++++----
 .../oap/server/core/CoreModuleProvider.java        | 74 ++++++++++++++++------
 .../analysis/worker/MetricsStreamProcessor.java    | 27 ++++----
 .../core/analysis/worker/MetricsTransWorker.java   | 30 +++++++--
 .../register/worker/InventoryStreamProcessor.java  | 28 ++++----
 .../register/worker/RegisterPersistentWorker.java  | 21 +++++-
 .../server/core/remote/RemoteServiceHandler.java   | 34 ++++------
 .../core/remote/client/GRPCRemoteClient.java       | 35 +++++-----
 .../core/remote/client/RemoteClientManager.java    | 40 ++++++------
 .../core/remote/define/StreamDataMapping.java      | 72 ---------------------
 .../remote/define/StreamDataMappingSetter.java     | 29 ---------
 .../IRemoteHandleWorker.java}                      | 25 +++++---
 .../server/core/worker/WorkerInstancesService.java |  9 ++-
 .../server-core/src/main/proto/RemoteService.proto |  3 +-
 .../core/remote/RemoteServiceHandlerTestCase.java  | 39 +++++++-----
 .../remote/client/GRPCRemoteClientRealClient.java  | 26 +++-----
 .../remote/client/GRPCRemoteClientRealServer.java  |  6 +-
 .../remote/client/GRPCRemoteClientTestCase.java    | 36 ++++++-----
 .../remote/client/RemoteClientManagerTestCase.java | 23 ++++---
 .../core/storage/StorageInstallerTestCase.java     |  3 -
 20 files changed, 301 insertions(+), 298 deletions(-)

diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index f31445c..4bfef5a 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -18,18 +18,37 @@
 
 package org.apache.skywalking.oap.server.core;
 
-import java.util.*;
-import org.apache.skywalking.oap.server.core.cache.*;
-import org.apache.skywalking.oap.server.core.config.*;
-import org.apache.skywalking.oap.server.core.query.*;
-import org.apache.skywalking.oap.server.core.register.service.*;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
+import 
org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
+import 
org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
+import 
org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
+import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
+import org.apache.skywalking.oap.server.core.query.LogQueryService;
+import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
+import org.apache.skywalking.oap.server.core.query.MetricQueryService;
+import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
+import org.apache.skywalking.oap.server.core.query.TraceQueryService;
+import 
org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
 import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
 import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
-import org.apache.skywalking.oap.server.core.remote.define.*;
-import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
 import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.core.storage.model.*;
-import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
+import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 
 /**
@@ -82,8 +101,6 @@ public class CoreModule extends ModuleDefine {
         classes.add(IModelSetter.class);
         classes.add(IModelGetter.class);
         classes.add(IModelOverride.class);
-        classes.add(StreamDataMappingGetter.class);
-        classes.add(StreamDataMappingSetter.class);
         classes.add(RemoteClientManager.class);
         classes.add(RemoteSenderService.class);
     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 2c8f78a..f05b776 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -20,26 +20,65 @@ package org.apache.skywalking.oap.server.core;
 
 import java.io.IOException;
 import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
-import org.apache.skywalking.oap.server.core.analysis.*;
+import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
+import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
 import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
-import org.apache.skywalking.oap.server.core.cache.*;
-import org.apache.skywalking.oap.server.core.cluster.*;
-import org.apache.skywalking.oap.server.core.config.*;
+import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
+import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
+import 
org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
+import 
org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import 
org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
+import 
org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
 import org.apache.skywalking.oap.server.core.oal.rt.OALEngine;
 import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoader;
-import org.apache.skywalking.oap.server.core.query.*;
-import org.apache.skywalking.oap.server.core.register.service.*;
-import org.apache.skywalking.oap.server.core.remote.*;
-import org.apache.skywalking.oap.server.core.remote.client.*;
-import org.apache.skywalking.oap.server.core.remote.define.*;
+import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
+import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
+import org.apache.skywalking.oap.server.core.query.LogQueryService;
+import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
+import org.apache.skywalking.oap.server.core.query.MetricQueryService;
+import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
+import org.apache.skywalking.oap.server.core.query.TraceQueryService;
+import 
org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
+import 
org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
 import 
org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
-import org.apache.skywalking.oap.server.core.server.*;
-import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
 import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
+import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
 import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
-import org.apache.skywalking.oap.server.core.worker.*;
-import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import 
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
 import org.apache.skywalking.oap.server.library.server.ServerException;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
 import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
@@ -56,7 +95,6 @@ public class CoreModuleProvider extends ModuleProvider {
     private RemoteClientManager remoteClientManager;
     private final AnnotationScan annotationScan;
     private final StorageModels storageModels;
-    private final StreamDataMapping streamDataMapping;
     private final SourceReceiverImpl receiver;
     private StreamAnnotationListener streamAnnotationListener;
     private OALEngine oalEngine;
@@ -65,7 +103,6 @@ public class CoreModuleProvider extends ModuleProvider {
         super();
         this.moduleConfig = new CoreModuleConfig();
         this.annotationScan = new AnnotationScan();
-        this.streamDataMapping = new StreamDataMapping();
         this.storageModels = new StorageModels();
         this.receiver = new SourceReceiverImpl();
     }
@@ -129,9 +166,6 @@ public class CoreModuleProvider extends ModuleProvider {
 
         this.registerServiceImplementation(SourceReceiver.class, receiver);
 
-        this.registerServiceImplementation(StreamDataMappingGetter.class, 
streamDataMapping);
-        this.registerServiceImplementation(StreamDataMappingSetter.class, 
streamDataMapping);
-
         WorkerInstancesService instancesService = new WorkerInstancesService();
         this.registerServiceImplementation(IWorkerInstanceGetter.class, 
instancesService);
         this.registerServiceImplementation(IWorkerInstanceSetter.class, 
instancesService);
@@ -178,8 +212,6 @@ public class CoreModuleProvider extends ModuleProvider {
             annotationScan.scan();
 
             oalEngine.notifyAllListeners();
-
-            streamDataMapping.init();
         } catch (IOException | IllegalAccessException | InstantiationException 
e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 33488d3..3253792 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -18,16 +18,25 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import lombok.Getter;
-import org.apache.skywalking.oap.server.core.*;
-import org.apache.skywalking.oap.server.core.analysis.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
+import org.apache.skywalking.oap.server.core.analysis.Downsampling;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
@@ -68,9 +77,6 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
         IModelSetter modelSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
         DownsamplingConfigService configService = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
 
-        StreamDataMappingSetter streamDataMappingSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
-        streamDataMappingSetter.putIfAbsent(metricsClass);
-
         MetricsPersistentWorker hourPersistentWorker = null;
         MetricsPersistentWorker dayPersistentWorker = null;
         MetricsPersistentWorker monthPersistentWorker = null;
@@ -91,8 +97,7 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
         Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), 
new Storage(stream.name(), true, true, Downsampling.Minute));
         MetricsPersistentWorker minutePersistentWorker = 
minutePersistentWorker(moduleDefineHolder, metricsDAO, model);
 
-
-        MetricsTransWorker transWorker = new 
MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, 
hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
+        MetricsTransWorker transWorker = new 
MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, 
hourPersistentWorker, dayPersistentWorker, monthPersistentWorker, metricsClass);
 
         String remoteReceiverWorkerName = stream.name() + "_rec";
         IWorkerInstanceSetter workerInstanceSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
index e4d2520..e10a8e8 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
@@ -20,7 +20,10 @@ package 
org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.Objects;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.worker.IRemoteHandleWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.*;
@@ -29,7 +32,7 @@ import org.slf4j.*;
 /**
  * @author peng-yongsheng
  */
-public class MetricsTransWorker extends AbstractWorker<Metrics> {
+public class MetricsTransWorker extends AbstractWorker<Metrics> implements 
IRemoteHandleWorker<Metrics> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(MetricsTransWorker.class);
 
@@ -37,22 +40,25 @@ public class MetricsTransWorker extends 
AbstractWorker<Metrics> {
     private final MetricsPersistentWorker hourPersistenceWorker;
     private final MetricsPersistentWorker dayPersistenceWorker;
     private final MetricsPersistentWorker monthPersistenceWorker;
+    private final Class<? extends Metrics> metricsClass;
 
-    private CounterMetrics aggregationMinCounter;
-    private CounterMetrics aggregationHourCounter;
-    private CounterMetrics aggregationDayCounter;
-    private CounterMetrics aggregationMonthCounter;
+    private final CounterMetrics aggregationMinCounter;
+    private final CounterMetrics aggregationHourCounter;
+    private final CounterMetrics aggregationDayCounter;
+    private final CounterMetrics aggregationMonthCounter;
 
     public MetricsTransWorker(ModuleDefineHolder moduleDefineHolder, String 
modelName,
         MetricsPersistentWorker minutePersistenceWorker,
         MetricsPersistentWorker hourPersistenceWorker,
         MetricsPersistentWorker dayPersistenceWorker,
-        MetricsPersistentWorker monthPersistenceWorker) {
+        MetricsPersistentWorker monthPersistenceWorker,
+        Class<? extends Metrics> metricsClass) {
         super(moduleDefineHolder);
         this.minutePersistenceWorker = minutePersistenceWorker;
         this.hourPersistenceWorker = hourPersistenceWorker;
         this.dayPersistenceWorker = dayPersistenceWorker;
         this.monthPersistenceWorker = monthPersistenceWorker;
+        this.metricsClass = metricsClass;
 
         MetricsCreator metricsCreator = 
moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
         aggregationMinCounter = 
metricsCreator.createCounter("metrics_aggregation", "The number of rows in 
aggregation",
@@ -65,6 +71,18 @@ public class MetricsTransWorker extends 
AbstractWorker<Metrics> {
             new MetricsTag.Keys("metricName", "level", "dimensionality"), new 
MetricsTag.Values(modelName, "2", "month"));
     }
 
+    @Override public Metrics deserialize(RemoteData remoteData) {
+        try {
+            StreamData streamData = metricsClass.newInstance();
+            Metrics metrics = (Metrics)streamData;
+            streamData.deserialize(remoteData);
+            return metrics;
+        } catch (Exception e) {
+            logger.error("Metrics Class " + metricsClass.getName() + " can't 
be instantiation.", e);
+            throw new IllegalStateException("Metrics Class " + 
metricsClass.getName() + " can't be instantiation.");
+        }
+    }
+
     @Override public void in(Metrics metrics) {
         if (Objects.nonNull(hourPersistenceWorker)) {
             aggregationMonthCounter.inc();
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
index 042a9bc..9341d51 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
@@ -18,14 +18,22 @@
 
 package org.apache.skywalking.oap.server.core.register.worker;
 
-import java.util.*;
-import org.apache.skywalking.oap.server.core.*;
-import org.apache.skywalking.oap.server.core.analysis.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.Downsampling;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
@@ -47,7 +55,8 @@ public class InventoryStreamProcessor implements 
StreamProcessor<RegisterSource>
     }
 
     @SuppressWarnings("unchecked")
-    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, 
Class<? extends RegisterSource> inventoryClass) {
+    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
+        Class<? extends RegisterSource> inventoryClass) {
         StorageDAO storageDAO = 
moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
         IRegisterDAO registerDAO;
         try {
@@ -59,10 +68,7 @@ public class InventoryStreamProcessor implements 
StreamProcessor<RegisterSource>
         IModelSetter modelSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
         Model model = modelSetter.putIfAbsent(inventoryClass, 
stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None));
 
-        StreamDataMappingSetter streamDataMappingSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
-        streamDataMappingSetter.putIfAbsent(inventoryClass);
-
-        RegisterPersistentWorker persistentWorker = new 
RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, 
stream.scopeId());
+        RegisterPersistentWorker persistentWorker = new 
RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, 
stream.scopeId(), inventoryClass);
 
         String remoteReceiverWorkerName = stream.name() + "_rec";
         IWorkerInstanceSetter workerInstanceSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 5cd5079..bcbe798 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -24,16 +24,19 @@ import 
org.apache.skywalking.apm.commons.datacarrier.consumer.*;
 import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.worker.IRemoteHandleWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
-public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
+public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> 
implements IRemoteHandleWorker<RegisterSource> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(RegisterPersistentWorker.class);
 
@@ -43,13 +46,15 @@ public class RegisterPersistentWorker extends 
AbstractWorker<RegisterSource> {
     private final IRegisterLockDAO registerLockDAO;
     private final IRegisterDAO registerDAO;
     private final DataCarrier<RegisterSource> dataCarrier;
+    private final Class<? extends RegisterSource> inventoryClass;
 
     RegisterPersistentWorker(ModuleDefineHolder moduleDefineHolder, String 
modelName,
-        IRegisterDAO registerDAO, int scopeId) {
+        IRegisterDAO registerDAO, int scopeId, Class<? extends RegisterSource> 
inventoryClass) {
         super(moduleDefineHolder);
         this.modelName = modelName;
         this.sources = new HashMap<>();
         this.registerDAO = registerDAO;
+        this.inventoryClass = inventoryClass;
         this.registerLockDAO = 
moduleDefineHolder.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class);
         this.scopeId = scopeId;
         this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + 
modelName, 1, 1000);
@@ -69,6 +74,18 @@ public class RegisterPersistentWorker extends 
AbstractWorker<RegisterSource> {
         this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new 
RegisterPersistentWorker.PersistentConsumer(this));
     }
 
+    @Override public RegisterSource deserialize(RemoteData remoteData) {
+        try {
+            StreamData streamData = inventoryClass.newInstance();
+            RegisterSource registerSource = (RegisterSource)streamData;
+            registerSource.deserialize(remoteData);
+            return registerSource;
+        } catch (Exception e) {
+            logger.error("Inventory Class " + inventoryClass.getName() + " 
can't be instantiation.", e);
+            throw new IllegalStateException("Inventory Class " + 
inventoryClass.getName() + " can't be instantiation.");
+        }
+    }
+
     @Override public final void in(RegisterSource registerSource) {
         registerSource.setEndOfBatchContext(new EndOfBatchContext(false));
         dataCarrier.produce(registerSource);
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
index f8fb8f7..8d3c438 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -21,16 +21,22 @@ package org.apache.skywalking.oap.server.core.remote;
 import io.grpc.stub.StreamObserver;
 import java.util.Objects;
 import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
+import 
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.worker.IRemoteHandleWorker;
 import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class is Server-side streaming RPC implementation. It's a common 
service for OAP servers to receive message from
@@ -44,7 +50,6 @@ public class RemoteServiceHandler extends 
RemoteServiceGrpc.RemoteServiceImplBas
     private static final Logger logger = 
LoggerFactory.getLogger(RemoteServiceHandler.class);
 
     private final ModuleDefineHolder moduleDefineHolder;
-    private StreamDataMappingGetter streamDataMappingGetter;
     private IWorkerInstanceGetter workerInstanceGetter;
     private CounterMetrics remoteInCounter;
     private CounterMetrics remoteInErrorCounter;
@@ -69,14 +74,6 @@ public class RemoteServiceHandler extends 
RemoteServiceGrpc.RemoteServiceImplBas
     }
 
     @Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> 
responseObserver) {
-        if (Objects.isNull(streamDataMappingGetter)) {
-            synchronized (RemoteServiceHandler.class) {
-                if (Objects.isNull(streamDataMappingGetter)) {
-                    streamDataMappingGetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class);
-                }
-            }
-        }
-
         if (Objects.isNull(workerInstanceGetter)) {
             synchronized (RemoteServiceHandler.class) {
                 if (Objects.isNull(workerInstanceGetter)) {
@@ -90,17 +87,14 @@ public class RemoteServiceHandler extends 
RemoteServiceGrpc.RemoteServiceImplBas
                 remoteInCounter.inc();
                 HistogramMetrics.Timer timer = remoteInHistogram.createTimer();
                 try {
-                    int streamDataId = message.getStreamDataId();
-                    String nextWorkerName = message.getNextWorkName();
+                    String nextWorkerName = message.getNextWorkerName();
                     RemoteData remoteData = message.getRemoteData();
 
-                    Class<? extends StreamData> streamDataClass = 
streamDataMappingGetter.findClassById(streamDataId);
                     try {
-                        StreamData streamData = streamDataClass.newInstance();
-                        streamData.deserialize(remoteData);
                         AbstractWorker nextWorker = 
workerInstanceGetter.get(nextWorkerName);
+                        IRemoteHandleWorker handleWorker = 
(IRemoteHandleWorker)nextWorker;
                         if (nextWorker != null) {
-                            nextWorker.in(streamData);
+                            
nextWorker.in(handleWorker.deserialize(remoteData));
                         } else {
                             remoteInTargetNotFoundCounter.inc();
                             logger.warn("Work name [{}] not found. Check OAL 
script, make sure they are same in the whole cluster.", nextWorkerName);
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index c4f9e35..fd80c9c 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -20,24 +20,29 @@ package org.apache.skywalking.oap.server.core.remote.client;
 
 import io.grpc.ManagedChannel;
 import io.grpc.stub.StreamObserver;
-import java.util.*;
+import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
+import 
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
 import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
-import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This is a wrapper of the gRPC client for sending message to each other OAP 
server.
- * It contains a block queue to buffering the message and sending the message 
by batch.
+ * This is a wrapper of the gRPC client for sending message to each other OAP 
server. It contains a block queue to
+ * buffering the message and sending the message by batch.
  *
  * @author peng-yongsheng
  */
@@ -48,7 +53,6 @@ public class GRPCRemoteClient implements RemoteClient {
     private final int channelSize;
     private final int bufferSize;
     private final Address address;
-    private final StreamDataMappingGetter streamDataMappingGetter;
     private final AtomicInteger concurrentStreamObserverNumber = new 
AtomicInteger(0);
     private GRPCClient client;
     private DataCarrier<RemoteMessage> carrier;
@@ -56,10 +60,8 @@ public class GRPCRemoteClient implements RemoteClient {
     private CounterMetrics remoteOutCounter;
     private CounterMetrics remoteOutErrorCounter;
 
-
-    public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, 
StreamDataMappingGetter streamDataMappingGetter, Address address, int 
channelSize,
+    public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, Address 
address, int channelSize,
         int bufferSize) {
-        this.streamDataMappingGetter = streamDataMappingGetter;
         this.address = address;
         this.channelSize = channelSize;
         this.bufferSize = bufferSize;
@@ -123,10 +125,8 @@ public class GRPCRemoteClient implements RemoteClient {
      * @param streamData the entity contains the values.
      */
     @Override public void push(String nextWorkerName, StreamData streamData) {
-        int streamDataId = 
streamDataMappingGetter.findIdByClass(streamData.getClass());
         RemoteMessage.Builder builder = RemoteMessage.newBuilder();
-        builder.setNextWorkName(nextWorkerName);
-        builder.setStreamDataId(streamDataId);
+        builder.setNextWorkerName(nextWorkerName);
         builder.setRemoteData(streamData.serialize());
 
         this.getDataCarrier().produce(builder.build());
@@ -159,9 +159,8 @@ public class GRPCRemoteClient implements RemoteClient {
     }
 
     /**
-     * Create a gRPC stream observer to sending stream data, one stream 
observer
-     * could send multiple stream data by a single consume.
-     * The max number of concurrency allowed at the same time is 10.
+     * Create a gRPC stream observer to sending stream data, one stream 
observer could send multiple stream data by a
+     * single consume. The max number of concurrency allowed at the same time 
is 10.
      *
      * @return stream observer
      */
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 9bfd4d1..06a963f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -18,15 +18,28 @@
 
 package org.apache.skywalking.oap.server.core.remote.client;
 
-import java.util.*;
-import java.util.concurrent.*;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cluster.*;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
-import org.apache.skywalking.oap.server.library.module.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import org.apache.skywalking.oap.server.library.module.Service;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class manages the connections between OAP servers. There is a task 
schedule that will automatically query a
@@ -39,7 +52,6 @@ public class RemoteClientManager implements Service {
     private static final Logger logger = 
LoggerFactory.getLogger(RemoteClientManager.class);
 
     private final ModuleDefineHolder moduleDefineHolder;
-    private StreamDataMappingGetter streamDataMappingGetter;
     private ClusterNodesQuery clusterNodesQuery;
     private final List<RemoteClient> clientsA;
     private final List<RemoteClient> clientsB;
@@ -76,14 +88,6 @@ public class RemoteClientManager implements Service {
                 }
             }
 
-            if (Objects.isNull(streamDataMappingGetter)) {
-                synchronized (RemoteClientManager.class) {
-                    if (Objects.isNull(streamDataMappingGetter)) {
-                        this.streamDataMappingGetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class);
-                    }
-                }
-            }
-
             if (logger.isDebugEnabled()) {
                 logger.debug("Refresh remote nodes collection.");
             }
@@ -199,7 +203,7 @@ public class RemoteClientManager implements Service {
                         RemoteClient client = new 
SelfRemoteClient(moduleDefineHolder, address);
                         getFreeClients().add(client);
                     } else {
-                        RemoteClient client = new 
GRPCRemoteClient(moduleDefineHolder, streamDataMappingGetter, address, 1, 3000);
+                        RemoteClient client = new 
GRPCRemoteClient(moduleDefineHolder, address, 1, 3000);
                         client.connect();
                         getFreeClients().add(client);
                     }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
deleted file mode 100644
index 00ef25b..0000000
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.remote.define;
-
-import java.util.*;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-
-/**
- * @author peng-yongsheng
- */
-public class StreamDataMapping implements StreamDataMappingGetter, 
StreamDataMappingSetter {
-    private List<Class<? extends StreamData>> streamClassList;
-    private final Map<Class<? extends StreamData>, Integer> classMap;
-    private final Map<Integer, Class<? extends StreamData>> idMap;
-
-    public StreamDataMapping() {
-        streamClassList = new ArrayList<>();
-        this.classMap = new HashMap<>();
-        this.idMap = new HashMap<>();
-    }
-
-    @Override public synchronized void putIfAbsent(Class<? extends StreamData> 
streamDataClass) {
-        if (classMap.containsKey(streamDataClass)) {
-            return;
-        }
-
-        streamClassList.add(streamDataClass);
-    }
-
-    public void init() {
-        /**
-         * The stream protocol use this list order to assign the ID,
-         * which is used in across node communication. This order must be 
certain.
-         */
-        Collections.sort(streamClassList, new Comparator<Class>() {
-            @Override public int compare(Class streamClass1, Class 
streamClass2) {
-                return 
streamClass1.getName().compareTo(streamClass2.getName());
-            }
-        });
-
-        for (int i = 0; i < streamClassList.size(); i++) {
-            Class<? extends StreamData> streamClass = streamClassList.get(i);
-            int streamId = i + 1;
-            classMap.put(streamClass, streamId);
-            idMap.put(streamId, streamClass);
-        }
-    }
-
-    @Override public int findIdByClass(Class<? extends StreamData> 
streamDataClass) {
-        return classMap.get(streamDataClass);
-    }
-
-    @Override public Class<? extends StreamData> findClassById(int id) {
-        return idMap.get(id);
-    }
-}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java
deleted file mode 100644
index dfff820..0000000
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.remote.define;
-
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.library.module.Service;
-
-/**
- * @author peng-yongsheng
- */
-public interface StreamDataMappingSetter extends Service {
-    void putIfAbsent(Class<? extends StreamData> streamDataClass);
-}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IRemoteHandleWorker.java
similarity index 60%
rename from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java
rename to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IRemoteHandleWorker.java
index fffebaf..015454c 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IRemoteHandleWorker.java
@@ -16,17 +16,24 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.remote.define;
+package org.apache.skywalking.oap.server.core.worker;
 
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 
 /**
- * @author peng-yongsheng
+ * The interface implementation could support deserialize.
+ *
+ * @param <INPUT> class with deserialize supported in OAP inside remote rpc.
+ *
+ * @author wusheng
  */
-public interface StreamDataMappingGetter extends Service {
-
-    int findIdByClass(Class<? extends StreamData> streamDataClass);
-
-    Class<? extends StreamData> findClassById(int id);
+public interface IRemoteHandleWorker<INPUT> {
+    /**
+     * Do deserialize
+     *
+     * @param remoteData hosts data from gRPC
+     *
+     * @return instance brings data in remote data.
+     */
+    INPUT deserialize(RemoteData remoteData);
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
index 5a80c87..37fb586 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
@@ -21,11 +21,15 @@ package org.apache.skywalking.oap.server.core.worker;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
+import 
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @author peng-yongsheng
+ * Worker Instance Service hosts all remote handler workers, including metrics 
and register.
+ * All this kind of works should implemenet {@link IRemoteHandleWorker} to 
adapt {@link RemoteServiceGrpc}
+ *
+ * @author peng-yongsheng, wusheng
  */
 public class WorkerInstancesService implements IWorkerInstanceSetter, 
IWorkerInstanceGetter {
     private static final Logger logger = 
LoggerFactory.getLogger(WorkerInstancesService.class);
@@ -44,6 +48,9 @@ public class WorkerInstancesService implements 
IWorkerInstanceSetter, IWorkerIns
         if (instances.containsKey(remoteReceiverWorkName)) {
             throw new UnexpectedException("Duplicate worker name:" + 
remoteReceiverWorkName);
         }
+        if (!(instance instanceof IRemoteHandleWorker)) {
+            throw new IllegalStateException("Worker " + 
instance.getClass().getName() + " must implement IRemoteHandleWorker.");
+        }
         instances.put(remoteReceiverWorkName, instance);
         logger.debug("Worker {} has been registered as {}", 
instance.toString(), remoteReceiverWorkName);
     }
diff --git a/oap-server/server-core/src/main/proto/RemoteService.proto 
b/oap-server/server-core/src/main/proto/RemoteService.proto
index 885a2b4..73ed7a6 100644
--- a/oap-server/server-core/src/main/proto/RemoteService.proto
+++ b/oap-server/server-core/src/main/proto/RemoteService.proto
@@ -27,8 +27,7 @@ service RemoteService {
 }
 
 message RemoteMessage {
-    string nextWorkName = 1;
-    int32 streamDataId = 2;
+    string nextWorkerName = 1;
     RemoteData remoteData = 3;
 }
 
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
index 7bd5dfb..50d1336 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
@@ -18,22 +18,34 @@
 
 package org.apache.skywalking.oap.server.core.remote;
 
-import io.grpc.inprocess.*;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
 import io.grpc.stub.StreamObserver;
 import io.grpc.testing.GrpcCleanupRule;
 import java.io.IOException;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
+import 
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.*;
+import 
org.apache.skywalking.oap.server.library.module.DuplicateProviderException;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import 
org.apache.skywalking.oap.server.library.module.ProviderNotFoundException;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
-import org.junit.*;
-
-import static org.mockito.Mockito.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * @author peng-yongsheng
@@ -52,12 +64,6 @@ public class RemoteServiceHandlerTestCase {
         ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
         moduleManager.put(CoreModule.NAME, moduleDefine);
 
-        StreamDataMappingGetter classGetter = 
mock(StreamDataMappingGetter.class);
-        Class dataClass = TestRemoteData.class;
-        
when(classGetter.findClassById(streamDataClassId)).thenReturn(dataClass);
-
-        
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
 classGetter);
-
         String serverName = InProcessServerBuilder.generateName();
         MetricsCreator metricsCreator = mock(MetricsCreator.class);
         when(metricsCreator.createCounter(any(), any(), any(), 
any())).thenReturn(new CounterMetrics() {
@@ -101,8 +107,7 @@ public class RemoteServiceHandlerTestCase {
         });
 
         RemoteMessage.Builder remoteMessage = RemoteMessage.newBuilder();
-        remoteMessage.setStreamDataId(streamDataClassId);
-        remoteMessage.setNextWorkName(testWorkerId);
+        remoteMessage.setNextWorkerName(testWorkerId);
 
         RemoteData.Builder remoteData = RemoteData.newBuilder();
         remoteData.addDataStrings("test1");
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
index 262ec18..7f506f9 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
@@ -19,17 +19,21 @@
 package org.apache.skywalking.oap.server.core.remote.client;
 
 import java.util.concurrent.TimeUnit;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
 import org.junit.Assert;
 
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 /**
  * @author peng-yongsheng
@@ -53,7 +57,7 @@ public class GRPCRemoteClientRealClient {
         moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
         
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class,
 metricsCreator);
 
-        GRPCRemoteClient remoteClient = spy(new 
GRPCRemoteClient(moduleManager, new TestMappingGetter(), address, 1, 10));
+        GRPCRemoteClient remoteClient = spy(new 
GRPCRemoteClient(moduleManager, address, 1, 10));
         remoteClient.connect();
 
         for (int i = 0; i < 10000; i++) {
@@ -64,18 +68,6 @@ public class GRPCRemoteClientRealClient {
         TimeUnit.MINUTES.sleep(10);
     }
 
-    public static class TestMappingGetter implements StreamDataMappingGetter {
-
-        @Override public int findIdByClass(Class streamDataClass) {
-            return 1;
-        }
-
-        @Override public Class<StreamData> findClassById(int id) {
-            Class<?> clazz = TestStreamData.class;
-            return (Class<StreamData>)clazz;
-        }
-    }
-
     public static class TestStreamData extends StreamData {
 
         private long value;
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
index 51945f2..a0c2113 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
@@ -21,10 +21,10 @@ package org.apache.skywalking.oap.server.core.remote.client;
 import java.util.concurrent.TimeUnit;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
 import org.apache.skywalking.oap.server.library.server.ServerException;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
-import org.apache.skywalking.oap.server.testing.module.*;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
 
 /**
  * @author peng-yongsheng
@@ -36,8 +36,6 @@ public class GRPCRemoteClientRealServer {
         ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
         moduleManager.put(CoreModule.NAME, moduleDefine);
 
-        
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
 new GRPCRemoteClientRealClient.TestMappingGetter());
-
         GRPCServer server = new GRPCServer("localhost", 10000);
         server.initialize();
 
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
index a2281f8..3513ca3 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
@@ -22,17 +22,28 @@ import io.grpc.testing.GrpcServerRule;
 import java.util.concurrent.TimeUnit;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
 import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
-import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
-import org.junit.*;
-
-import static org.mockito.Mockito.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 /**
  * @author peng-yongsheng
@@ -41,7 +52,6 @@ public class GRPCRemoteClientTestCase {
 
     private final String nextWorkerId = "mock-worker";
     private ModuleManagerTesting moduleManager;
-    private StreamDataMappingGetter classGetter;
     @Rule public final GrpcServerRule grpcServerRule = new 
GrpcServerRule().directExecutor();
 
     @Before
@@ -50,9 +60,6 @@ public class GRPCRemoteClientTestCase {
         ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
         moduleManager.put(CoreModule.NAME, moduleDefine);
 
-        classGetter = mock(StreamDataMappingGetter.class);
-        
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
 classGetter);
-
         WorkerInstancesService workerInstancesService = new 
WorkerInstancesService();
         
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceGetter.class,
 workerInstancesService);
         
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class,
 workerInstancesService);
@@ -79,16 +86,11 @@ public class GRPCRemoteClientTestCase {
         grpcServerRule.getServiceRegistry().addService(new 
RemoteServiceHandler(moduleManager));
 
         Address address = new Address("not-important", 11, false);
-        GRPCRemoteClient remoteClient = spy(new 
GRPCRemoteClient(moduleManager, classGetter, address, 1, 10));
+        GRPCRemoteClient remoteClient = spy(new 
GRPCRemoteClient(moduleManager, address, 1, 10));
         remoteClient.connect();
 
         doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel();
 
-        when(classGetter.findIdByClass(TestStreamData.class)).thenReturn(1);
-
-        Class dataClass = TestStreamData.class;
-        when(classGetter.findClassById(1)).thenReturn(dataClass);
-
         for (int i = 0; i < 12; i++) {
             remoteClient.push(nextWorkerId, new TestStreamData());
         }
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
index 2741af2..e83aa63 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
@@ -18,16 +18,23 @@
 
 package org.apache.skywalking.oap.server.core.remote.client;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cluster.*;
-import 
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
-import org.junit.*;
+import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
+import org.junit.Assert;
+import org.junit.Test;
 
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * @author peng-yongsheng
@@ -46,8 +53,6 @@ public class RemoteClientManagerTestCase {
         ClusterNodesQuery clusterNodesQuery = mock(ClusterNodesQuery.class);
         
clusterModuleDefine.provider().registerServiceImplementation(ClusterNodesQuery.class,
 clusterNodesQuery);
 
-        StreamDataMappingGetter streamDataMappingGetter = 
mock(StreamDataMappingGetter.class);
-        
coreModuleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
 streamDataMappingGetter);
 
         MetricsCreator metricsCreator = mock(MetricsCreator.class);
         when(metricsCreator.createGauge(any(), any(), any(), 
any())).thenReturn(new GaugeMetrics() {
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
index 1baa2c0..6cfcd37 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.core.storage;
 
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.CoreModuleProvider;
-import org.apache.skywalking.oap.server.core.remote.define.StreamDataMapping;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
 import org.apache.skywalking.oap.server.library.client.Client;
@@ -37,7 +36,6 @@ public class StorageInstallerTestCase {
 
     @Test
     public void testInstall() throws StorageException, 
ServiceNotProvidedException {
-        StreamDataMapping streamDataMapping = new StreamDataMapping();
         CoreModuleProvider moduleProvider = 
Mockito.mock(CoreModuleProvider.class);
         CoreModule moduleDefine = Mockito.spy(CoreModule.class);
         ModuleManager moduleManager = Mockito.mock(ModuleManager.class);
@@ -45,7 +43,6 @@ public class StorageInstallerTestCase {
         Whitebox.setInternalState(moduleDefine, "loadedProvider", 
moduleProvider);
 
         
Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleDefine);
-        
Mockito.when(moduleProvider.getService(StreamDataMapping.class)).thenReturn(streamDataMapping);
 
 //        streamDataMapping.generate();
 

Reply via email to