This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch workid2name
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/workid2name by this push:
new e8caf4b Remote metrics and inventory classes mapping too.
e8caf4b is described below
commit e8caf4bcebd62f719cadb9b4cc347501d72acc10
Author: Wu Sheng <[email protected]>
AuthorDate: Sat Jul 20 21:05:20 2019 +0800
Remote metrics and inventory classes mapping too.
---
.../skywalking/oap/server/core/CoreModule.java | 39 ++++++++----
.../oap/server/core/CoreModuleProvider.java | 74 ++++++++++++++++------
.../analysis/worker/MetricsStreamProcessor.java | 27 ++++----
.../core/analysis/worker/MetricsTransWorker.java | 30 +++++++--
.../register/worker/InventoryStreamProcessor.java | 28 ++++----
.../register/worker/RegisterPersistentWorker.java | 21 +++++-
.../server/core/remote/RemoteServiceHandler.java | 34 ++++------
.../core/remote/client/GRPCRemoteClient.java | 35 +++++-----
.../core/remote/client/RemoteClientManager.java | 40 ++++++------
.../core/remote/define/StreamDataMapping.java | 72 ---------------------
.../remote/define/StreamDataMappingSetter.java | 29 ---------
.../IRemoteHandleWorker.java} | 25 +++++---
.../server/core/worker/WorkerInstancesService.java | 9 ++-
.../server-core/src/main/proto/RemoteService.proto | 3 +-
.../core/remote/RemoteServiceHandlerTestCase.java | 39 +++++++-----
.../remote/client/GRPCRemoteClientRealClient.java | 26 +++-----
.../remote/client/GRPCRemoteClientRealServer.java | 6 +-
.../remote/client/GRPCRemoteClientTestCase.java | 36 ++++++-----
.../remote/client/RemoteClientManagerTestCase.java | 23 ++++---
.../core/storage/StorageInstallerTestCase.java | 3 -
20 files changed, 301 insertions(+), 298 deletions(-)
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index f31445c..4bfef5a 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -18,18 +18,37 @@
package org.apache.skywalking.oap.server.core;
-import java.util.*;
-import org.apache.skywalking.oap.server.core.cache.*;
-import org.apache.skywalking.oap.server.core.config.*;
-import org.apache.skywalking.oap.server.core.query.*;
-import org.apache.skywalking.oap.server.core.register.service.*;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
+import
org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
+import
org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
+import
org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
+import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
+import org.apache.skywalking.oap.server.core.query.LogQueryService;
+import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
+import org.apache.skywalking.oap.server.core.query.MetricQueryService;
+import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
+import org.apache.skywalking.oap.server.core.query.TraceQueryService;
+import
org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
-import org.apache.skywalking.oap.server.core.remote.define.*;
-import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.core.storage.model.*;
-import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
+import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
@@ -82,8 +101,6 @@ public class CoreModule extends ModuleDefine {
classes.add(IModelSetter.class);
classes.add(IModelGetter.class);
classes.add(IModelOverride.class);
- classes.add(StreamDataMappingGetter.class);
- classes.add(StreamDataMappingSetter.class);
classes.add(RemoteClientManager.class);
classes.add(RemoteSenderService.class);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 2c8f78a..f05b776 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -20,26 +20,65 @@ package org.apache.skywalking.oap.server.core;
import java.io.IOException;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
-import org.apache.skywalking.oap.server.core.analysis.*;
+import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
+import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
-import org.apache.skywalking.oap.server.core.cache.*;
-import org.apache.skywalking.oap.server.core.cluster.*;
-import org.apache.skywalking.oap.server.core.config.*;
+import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
+import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
+import
org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
+import
org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import
org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
+import
org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngine;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoader;
-import org.apache.skywalking.oap.server.core.query.*;
-import org.apache.skywalking.oap.server.core.register.service.*;
-import org.apache.skywalking.oap.server.core.remote.*;
-import org.apache.skywalking.oap.server.core.remote.client.*;
-import org.apache.skywalking.oap.server.core.remote.define.*;
+import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
+import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
+import org.apache.skywalking.oap.server.core.query.LogQueryService;
+import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
+import org.apache.skywalking.oap.server.core.query.MetricQueryService;
+import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
+import org.apache.skywalking.oap.server.core.query.TraceQueryService;
+import
org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import
org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
-import org.apache.skywalking.oap.server.core.server.*;
-import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
+import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
-import org.apache.skywalking.oap.server.core.worker.*;
-import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
@@ -56,7 +95,6 @@ public class CoreModuleProvider extends ModuleProvider {
private RemoteClientManager remoteClientManager;
private final AnnotationScan annotationScan;
private final StorageModels storageModels;
- private final StreamDataMapping streamDataMapping;
private final SourceReceiverImpl receiver;
private StreamAnnotationListener streamAnnotationListener;
private OALEngine oalEngine;
@@ -65,7 +103,6 @@ public class CoreModuleProvider extends ModuleProvider {
super();
this.moduleConfig = new CoreModuleConfig();
this.annotationScan = new AnnotationScan();
- this.streamDataMapping = new StreamDataMapping();
this.storageModels = new StorageModels();
this.receiver = new SourceReceiverImpl();
}
@@ -129,9 +166,6 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(SourceReceiver.class, receiver);
- this.registerServiceImplementation(StreamDataMappingGetter.class,
streamDataMapping);
- this.registerServiceImplementation(StreamDataMappingSetter.class,
streamDataMapping);
-
WorkerInstancesService instancesService = new WorkerInstancesService();
this.registerServiceImplementation(IWorkerInstanceGetter.class,
instancesService);
this.registerServiceImplementation(IWorkerInstanceSetter.class,
instancesService);
@@ -178,8 +212,6 @@ public class CoreModuleProvider extends ModuleProvider {
annotationScan.scan();
oalEngine.notifyAllListeners();
-
- streamDataMapping.init();
} catch (IOException | IllegalAccessException | InstantiationException
e) {
throw new ModuleStartException(e.getMessage(), e);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 33488d3..3253792 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -18,16 +18,25 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import lombok.Getter;
-import org.apache.skywalking.oap.server.core.*;
-import org.apache.skywalking.oap.server.core.analysis.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
+import org.apache.skywalking.oap.server.core.analysis.Downsampling;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -68,9 +77,6 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
IModelSetter modelSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
DownsamplingConfigService configService =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
- StreamDataMappingSetter streamDataMappingSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
- streamDataMappingSetter.putIfAbsent(metricsClass);
-
MetricsPersistentWorker hourPersistentWorker = null;
MetricsPersistentWorker dayPersistentWorker = null;
MetricsPersistentWorker monthPersistentWorker = null;
@@ -91,8 +97,7 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(),
new Storage(stream.name(), true, true, Downsampling.Minute));
MetricsPersistentWorker minutePersistentWorker =
minutePersistentWorker(moduleDefineHolder, metricsDAO, model);
-
- MetricsTransWorker transWorker = new
MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker,
hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
+ MetricsTransWorker transWorker = new
MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker,
hourPersistentWorker, dayPersistentWorker, monthPersistentWorker, metricsClass);
String remoteReceiverWorkerName = stream.name() + "_rec";
IWorkerInstanceSetter workerInstanceSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
index e4d2520..e10a8e8 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
@@ -20,7 +20,10 @@ package
org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.worker.IRemoteHandleWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
@@ -29,7 +32,7 @@ import org.slf4j.*;
/**
* @author peng-yongsheng
*/
-public class MetricsTransWorker extends AbstractWorker<Metrics> {
+public class MetricsTransWorker extends AbstractWorker<Metrics> implements
IRemoteHandleWorker<Metrics> {
private static final Logger logger =
LoggerFactory.getLogger(MetricsTransWorker.class);
@@ -37,22 +40,25 @@ public class MetricsTransWorker extends
AbstractWorker<Metrics> {
private final MetricsPersistentWorker hourPersistenceWorker;
private final MetricsPersistentWorker dayPersistenceWorker;
private final MetricsPersistentWorker monthPersistenceWorker;
+ private final Class<? extends Metrics> metricsClass;
- private CounterMetrics aggregationMinCounter;
- private CounterMetrics aggregationHourCounter;
- private CounterMetrics aggregationDayCounter;
- private CounterMetrics aggregationMonthCounter;
+ private final CounterMetrics aggregationMinCounter;
+ private final CounterMetrics aggregationHourCounter;
+ private final CounterMetrics aggregationDayCounter;
+ private final CounterMetrics aggregationMonthCounter;
public MetricsTransWorker(ModuleDefineHolder moduleDefineHolder, String
modelName,
MetricsPersistentWorker minutePersistenceWorker,
MetricsPersistentWorker hourPersistenceWorker,
MetricsPersistentWorker dayPersistenceWorker,
- MetricsPersistentWorker monthPersistenceWorker) {
+ MetricsPersistentWorker monthPersistenceWorker,
+ Class<? extends Metrics> metricsClass) {
super(moduleDefineHolder);
this.minutePersistenceWorker = minutePersistenceWorker;
this.hourPersistenceWorker = hourPersistenceWorker;
this.dayPersistenceWorker = dayPersistenceWorker;
this.monthPersistenceWorker = monthPersistenceWorker;
+ this.metricsClass = metricsClass;
MetricsCreator metricsCreator =
moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
aggregationMinCounter =
metricsCreator.createCounter("metrics_aggregation", "The number of rows in
aggregation",
@@ -65,6 +71,18 @@ public class MetricsTransWorker extends
AbstractWorker<Metrics> {
new MetricsTag.Keys("metricName", "level", "dimensionality"), new
MetricsTag.Values(modelName, "2", "month"));
}
+ @Override public Metrics deserialize(RemoteData remoteData) {
+ try {
+ StreamData streamData = metricsClass.newInstance();
+ Metrics metrics = (Metrics)streamData;
+ streamData.deserialize(remoteData);
+ return metrics;
+ } catch (Exception e) {
+ logger.error("Metrics Class " + metricsClass.getName() + " can't
be instantiation.", e);
+ throw new IllegalStateException("Metrics Class " +
metricsClass.getName() + " can't be instantiation.");
+ }
+ }
+
@Override public void in(Metrics metrics) {
if (Objects.nonNull(hourPersistenceWorker)) {
aggregationMonthCounter.inc();
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
index 042a9bc..9341d51 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
@@ -18,14 +18,22 @@
package org.apache.skywalking.oap.server.core.register.worker;
-import java.util.*;
-import org.apache.skywalking.oap.server.core.*;
-import org.apache.skywalking.oap.server.core.analysis.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.Downsampling;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -47,7 +55,8 @@ public class InventoryStreamProcessor implements
StreamProcessor<RegisterSource>
}
@SuppressWarnings("unchecked")
- public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
Class<? extends RegisterSource> inventoryClass) {
+ public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
+ Class<? extends RegisterSource> inventoryClass) {
StorageDAO storageDAO =
moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRegisterDAO registerDAO;
try {
@@ -59,10 +68,7 @@ public class InventoryStreamProcessor implements
StreamProcessor<RegisterSource>
IModelSetter modelSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(inventoryClass,
stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None));
- StreamDataMappingSetter streamDataMappingSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
- streamDataMappingSetter.putIfAbsent(inventoryClass);
-
- RegisterPersistentWorker persistentWorker = new
RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO,
stream.scopeId());
+ RegisterPersistentWorker persistentWorker = new
RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO,
stream.scopeId(), inventoryClass);
String remoteReceiverWorkerName = stream.name() + "_rec";
IWorkerInstanceSetter workerInstanceSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 5cd5079..bcbe798 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -24,16 +24,19 @@ import
org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.worker.IRemoteHandleWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
-public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
+public class RegisterPersistentWorker extends AbstractWorker<RegisterSource>
implements IRemoteHandleWorker<RegisterSource> {
private static final Logger logger =
LoggerFactory.getLogger(RegisterPersistentWorker.class);
@@ -43,13 +46,15 @@ public class RegisterPersistentWorker extends
AbstractWorker<RegisterSource> {
private final IRegisterLockDAO registerLockDAO;
private final IRegisterDAO registerDAO;
private final DataCarrier<RegisterSource> dataCarrier;
+ private final Class<? extends RegisterSource> inventoryClass;
RegisterPersistentWorker(ModuleDefineHolder moduleDefineHolder, String
modelName,
- IRegisterDAO registerDAO, int scopeId) {
+ IRegisterDAO registerDAO, int scopeId, Class<? extends RegisterSource>
inventoryClass) {
super(moduleDefineHolder);
this.modelName = modelName;
this.sources = new HashMap<>();
this.registerDAO = registerDAO;
+ this.inventoryClass = inventoryClass;
this.registerLockDAO =
moduleDefineHolder.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class);
this.scopeId = scopeId;
this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." +
modelName, 1, 1000);
@@ -69,6 +74,18 @@ public class RegisterPersistentWorker extends
AbstractWorker<RegisterSource> {
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new
RegisterPersistentWorker.PersistentConsumer(this));
}
+ @Override public RegisterSource deserialize(RemoteData remoteData) {
+ try {
+ StreamData streamData = inventoryClass.newInstance();
+ RegisterSource registerSource = (RegisterSource)streamData;
+ registerSource.deserialize(remoteData);
+ return registerSource;
+ } catch (Exception e) {
+ logger.error("Inventory Class " + inventoryClass.getName() + "
can't be instantiation.", e);
+ throw new IllegalStateException("Inventory Class " +
inventoryClass.getName() + " can't be instantiation.");
+ }
+ }
+
@Override public final void in(RegisterSource registerSource) {
registerSource.setEndOfBatchContext(new EndOfBatchContext(false));
dataCarrier.produce(registerSource);
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
index f8fb8f7..8d3c438 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -21,16 +21,22 @@ package org.apache.skywalking.oap.server.core.remote;
import io.grpc.stub.StreamObserver;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
+import
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.worker.IRemoteHandleWorker;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class is Server-side streaming RPC implementation. It's a common
service for OAP servers to receive message from
@@ -44,7 +50,6 @@ public class RemoteServiceHandler extends
RemoteServiceGrpc.RemoteServiceImplBas
private static final Logger logger =
LoggerFactory.getLogger(RemoteServiceHandler.class);
private final ModuleDefineHolder moduleDefineHolder;
- private StreamDataMappingGetter streamDataMappingGetter;
private IWorkerInstanceGetter workerInstanceGetter;
private CounterMetrics remoteInCounter;
private CounterMetrics remoteInErrorCounter;
@@ -69,14 +74,6 @@ public class RemoteServiceHandler extends
RemoteServiceGrpc.RemoteServiceImplBas
}
@Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty>
responseObserver) {
- if (Objects.isNull(streamDataMappingGetter)) {
- synchronized (RemoteServiceHandler.class) {
- if (Objects.isNull(streamDataMappingGetter)) {
- streamDataMappingGetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class);
- }
- }
- }
-
if (Objects.isNull(workerInstanceGetter)) {
synchronized (RemoteServiceHandler.class) {
if (Objects.isNull(workerInstanceGetter)) {
@@ -90,17 +87,14 @@ public class RemoteServiceHandler extends
RemoteServiceGrpc.RemoteServiceImplBas
remoteInCounter.inc();
HistogramMetrics.Timer timer = remoteInHistogram.createTimer();
try {
- int streamDataId = message.getStreamDataId();
- String nextWorkerName = message.getNextWorkName();
+ String nextWorkerName = message.getNextWorkerName();
RemoteData remoteData = message.getRemoteData();
- Class<? extends StreamData> streamDataClass =
streamDataMappingGetter.findClassById(streamDataId);
try {
- StreamData streamData = streamDataClass.newInstance();
- streamData.deserialize(remoteData);
AbstractWorker nextWorker =
workerInstanceGetter.get(nextWorkerName);
+ IRemoteHandleWorker handleWorker =
(IRemoteHandleWorker)nextWorker;
if (nextWorker != null) {
- nextWorker.in(streamData);
+
nextWorker.in(handleWorker.deserialize(remoteData));
} else {
remoteInTargetNotFoundCounter.inc();
logger.warn("Work name [{}] not found. Check OAL
script, make sure they are same in the whole cluster.", nextWorkerName);
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index c4f9e35..fd80c9c 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -20,24 +20,29 @@ package org.apache.skywalking.oap.server.core.remote.client;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
-import java.util.*;
+import java.util.List;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
+import
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
-import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * This is a wrapper of the gRPC client for sending message to each other OAP
server.
- * It contains a block queue to buffering the message and sending the message
by batch.
+ * This is a wrapper of the gRPC client for sending message to each other OAP
server. It contains a block queue to
+ * buffering the message and sending the message by batch.
*
* @author peng-yongsheng
*/
@@ -48,7 +53,6 @@ public class GRPCRemoteClient implements RemoteClient {
private final int channelSize;
private final int bufferSize;
private final Address address;
- private final StreamDataMappingGetter streamDataMappingGetter;
private final AtomicInteger concurrentStreamObserverNumber = new
AtomicInteger(0);
private GRPCClient client;
private DataCarrier<RemoteMessage> carrier;
@@ -56,10 +60,8 @@ public class GRPCRemoteClient implements RemoteClient {
private CounterMetrics remoteOutCounter;
private CounterMetrics remoteOutErrorCounter;
-
- public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder,
StreamDataMappingGetter streamDataMappingGetter, Address address, int
channelSize,
+ public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, Address
address, int channelSize,
int bufferSize) {
- this.streamDataMappingGetter = streamDataMappingGetter;
this.address = address;
this.channelSize = channelSize;
this.bufferSize = bufferSize;
@@ -123,10 +125,8 @@ public class GRPCRemoteClient implements RemoteClient {
* @param streamData the entity contains the values.
*/
@Override public void push(String nextWorkerName, StreamData streamData) {
- int streamDataId =
streamDataMappingGetter.findIdByClass(streamData.getClass());
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
- builder.setNextWorkName(nextWorkerName);
- builder.setStreamDataId(streamDataId);
+ builder.setNextWorkerName(nextWorkerName);
builder.setRemoteData(streamData.serialize());
this.getDataCarrier().produce(builder.build());
@@ -159,9 +159,8 @@ public class GRPCRemoteClient implements RemoteClient {
}
/**
- * Create a gRPC stream observer to sending stream data, one stream
observer
- * could send multiple stream data by a single consume.
- * The max number of concurrency allowed at the same time is 10.
+ * Create a gRPC stream observer to sending stream data, one stream
observer could send multiple stream data by a
+ * single consume. The max number of concurrency allowed at the same time
is 10.
*
* @return stream observer
*/
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 9bfd4d1..06a963f 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -18,15 +18,28 @@
package org.apache.skywalking.oap.server.core.remote.client;
-import java.util.*;
-import java.util.concurrent.*;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cluster.*;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
-import org.apache.skywalking.oap.server.library.module.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class manages the connections between OAP servers. There is a task
schedule that will automatically query a
@@ -39,7 +52,6 @@ public class RemoteClientManager implements Service {
private static final Logger logger =
LoggerFactory.getLogger(RemoteClientManager.class);
private final ModuleDefineHolder moduleDefineHolder;
- private StreamDataMappingGetter streamDataMappingGetter;
private ClusterNodesQuery clusterNodesQuery;
private final List<RemoteClient> clientsA;
private final List<RemoteClient> clientsB;
@@ -76,14 +88,6 @@ public class RemoteClientManager implements Service {
}
}
- if (Objects.isNull(streamDataMappingGetter)) {
- synchronized (RemoteClientManager.class) {
- if (Objects.isNull(streamDataMappingGetter)) {
- this.streamDataMappingGetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class);
- }
- }
- }
-
if (logger.isDebugEnabled()) {
logger.debug("Refresh remote nodes collection.");
}
@@ -199,7 +203,7 @@ public class RemoteClientManager implements Service {
RemoteClient client = new
SelfRemoteClient(moduleDefineHolder, address);
getFreeClients().add(client);
} else {
- RemoteClient client = new
GRPCRemoteClient(moduleDefineHolder, streamDataMappingGetter, address, 1, 3000);
+ RemoteClient client = new
GRPCRemoteClient(moduleDefineHolder, address, 1, 3000);
client.connect();
getFreeClients().add(client);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
deleted file mode 100644
index 00ef25b..0000000
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.remote.define;
-
-import java.util.*;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-
-/**
- * @author peng-yongsheng
- */
-public class StreamDataMapping implements StreamDataMappingGetter,
StreamDataMappingSetter {
- private List<Class<? extends StreamData>> streamClassList;
- private final Map<Class<? extends StreamData>, Integer> classMap;
- private final Map<Integer, Class<? extends StreamData>> idMap;
-
- public StreamDataMapping() {
- streamClassList = new ArrayList<>();
- this.classMap = new HashMap<>();
- this.idMap = new HashMap<>();
- }
-
- @Override public synchronized void putIfAbsent(Class<? extends StreamData>
streamDataClass) {
- if (classMap.containsKey(streamDataClass)) {
- return;
- }
-
- streamClassList.add(streamDataClass);
- }
-
- public void init() {
- /**
- * The stream protocol use this list order to assign the ID,
- * which is used in across node communication. This order must be
certain.
- */
- Collections.sort(streamClassList, new Comparator<Class>() {
- @Override public int compare(Class streamClass1, Class
streamClass2) {
- return
streamClass1.getName().compareTo(streamClass2.getName());
- }
- });
-
- for (int i = 0; i < streamClassList.size(); i++) {
- Class<? extends StreamData> streamClass = streamClassList.get(i);
- int streamId = i + 1;
- classMap.put(streamClass, streamId);
- idMap.put(streamId, streamClass);
- }
- }
-
- @Override public int findIdByClass(Class<? extends StreamData>
streamDataClass) {
- return classMap.get(streamDataClass);
- }
-
- @Override public Class<? extends StreamData> findClassById(int id) {
- return idMap.get(id);
- }
-}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java
deleted file mode 100644
index dfff820..0000000
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.remote.define;
-
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.library.module.Service;
-
-/**
- * @author peng-yongsheng
- */
-public interface StreamDataMappingSetter extends Service {
- void putIfAbsent(Class<? extends StreamData> streamDataClass);
-}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IRemoteHandleWorker.java
similarity index 60%
rename from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java
rename to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IRemoteHandleWorker.java
index fffebaf..015454c 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IRemoteHandleWorker.java
@@ -16,17 +16,24 @@
*
*/
-package org.apache.skywalking.oap.server.core.remote.define;
+package org.apache.skywalking.oap.server.core.worker;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
- * @author peng-yongsheng
+ * The interface implementation could support deserialize.
+ *
+ * @param <INPUT> class with deserialize supported in OAP inside remote rpc.
+ *
+ * @author wusheng
*/
-public interface StreamDataMappingGetter extends Service {
-
- int findIdByClass(Class<? extends StreamData> streamDataClass);
-
- Class<? extends StreamData> findClassById(int id);
+public interface IRemoteHandleWorker<INPUT> {
+ /**
+ * Do deserialize
+ *
+ * @param remoteData hosts data from gRPC
+ *
+ * @return instance brings data in remote data.
+ */
+ INPUT deserialize(RemoteData remoteData);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
index 5a80c87..37fb586 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
@@ -21,11 +21,15 @@ package org.apache.skywalking.oap.server.core.worker;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.core.UnexpectedException;
+import
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * @author peng-yongsheng
+ * Worker Instance Service hosts all remote handler workers, including metrics
and register.
+ * All this kind of works should implemenet {@link IRemoteHandleWorker} to
adapt {@link RemoteServiceGrpc}
+ *
+ * @author peng-yongsheng, wusheng
*/
public class WorkerInstancesService implements IWorkerInstanceSetter,
IWorkerInstanceGetter {
private static final Logger logger =
LoggerFactory.getLogger(WorkerInstancesService.class);
@@ -44,6 +48,9 @@ public class WorkerInstancesService implements
IWorkerInstanceSetter, IWorkerIns
if (instances.containsKey(remoteReceiverWorkName)) {
throw new UnexpectedException("Duplicate worker name:" +
remoteReceiverWorkName);
}
+ if (!(instance instanceof IRemoteHandleWorker)) {
+ throw new IllegalStateException("Worker " +
instance.getClass().getName() + " must implement IRemoteHandleWorker.");
+ }
instances.put(remoteReceiverWorkName, instance);
logger.debug("Worker {} has been registered as {}",
instance.toString(), remoteReceiverWorkName);
}
diff --git a/oap-server/server-core/src/main/proto/RemoteService.proto
b/oap-server/server-core/src/main/proto/RemoteService.proto
index 885a2b4..73ed7a6 100644
--- a/oap-server/server-core/src/main/proto/RemoteService.proto
+++ b/oap-server/server-core/src/main/proto/RemoteService.proto
@@ -27,8 +27,7 @@ service RemoteService {
}
message RemoteMessage {
- string nextWorkName = 1;
- int32 streamDataId = 2;
+ string nextWorkerName = 1;
RemoteData remoteData = 3;
}
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
index 7bd5dfb..50d1336 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
@@ -18,22 +18,34 @@
package org.apache.skywalking.oap.server.core.remote;
-import io.grpc.inprocess.*;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
+import
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.*;
+import
org.apache.skywalking.oap.server.library.module.DuplicateProviderException;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import
org.apache.skywalking.oap.server.library.module.ProviderNotFoundException;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
-import org.junit.*;
-
-import static org.mockito.Mockito.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* @author peng-yongsheng
@@ -52,12 +64,6 @@ public class RemoteServiceHandlerTestCase {
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
- StreamDataMappingGetter classGetter =
mock(StreamDataMappingGetter.class);
- Class dataClass = TestRemoteData.class;
-
when(classGetter.findClassById(streamDataClassId)).thenReturn(dataClass);
-
-
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
classGetter);
-
String serverName = InProcessServerBuilder.generateName();
MetricsCreator metricsCreator = mock(MetricsCreator.class);
when(metricsCreator.createCounter(any(), any(), any(),
any())).thenReturn(new CounterMetrics() {
@@ -101,8 +107,7 @@ public class RemoteServiceHandlerTestCase {
});
RemoteMessage.Builder remoteMessage = RemoteMessage.newBuilder();
- remoteMessage.setStreamDataId(streamDataClassId);
- remoteMessage.setNextWorkName(testWorkerId);
+ remoteMessage.setNextWorkerName(testWorkerId);
RemoteData.Builder remoteData = RemoteData.newBuilder();
remoteData.addDataStrings("test1");
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
index 262ec18..7f506f9 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
@@ -19,17 +19,21 @@
package org.apache.skywalking.oap.server.core.remote.client;
import java.util.concurrent.TimeUnit;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
import org.junit.Assert;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
/**
* @author peng-yongsheng
@@ -53,7 +57,7 @@ public class GRPCRemoteClientRealClient {
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class,
metricsCreator);
- GRPCRemoteClient remoteClient = spy(new
GRPCRemoteClient(moduleManager, new TestMappingGetter(), address, 1, 10));
+ GRPCRemoteClient remoteClient = spy(new
GRPCRemoteClient(moduleManager, address, 1, 10));
remoteClient.connect();
for (int i = 0; i < 10000; i++) {
@@ -64,18 +68,6 @@ public class GRPCRemoteClientRealClient {
TimeUnit.MINUTES.sleep(10);
}
- public static class TestMappingGetter implements StreamDataMappingGetter {
-
- @Override public int findIdByClass(Class streamDataClass) {
- return 1;
- }
-
- @Override public Class<StreamData> findClassById(int id) {
- Class<?> clazz = TestStreamData.class;
- return (Class<StreamData>)clazz;
- }
- }
-
public static class TestStreamData extends StreamData {
private long value;
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
index 51945f2..a0c2113 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
@@ -21,10 +21,10 @@ package org.apache.skywalking.oap.server.core.remote.client;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
-import org.apache.skywalking.oap.server.testing.module.*;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
/**
* @author peng-yongsheng
@@ -36,8 +36,6 @@ public class GRPCRemoteClientRealServer {
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
-
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
new GRPCRemoteClientRealClient.TestMappingGetter());
-
GRPCServer server = new GRPCServer("localhost", 10000);
server.initialize();
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
index a2281f8..3513ca3 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
@@ -22,17 +22,28 @@ import io.grpc.testing.GrpcServerRule;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
-import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
-import org.junit.*;
-
-import static org.mockito.Mockito.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
/**
* @author peng-yongsheng
@@ -41,7 +52,6 @@ public class GRPCRemoteClientTestCase {
private final String nextWorkerId = "mock-worker";
private ModuleManagerTesting moduleManager;
- private StreamDataMappingGetter classGetter;
@Rule public final GrpcServerRule grpcServerRule = new
GrpcServerRule().directExecutor();
@Before
@@ -50,9 +60,6 @@ public class GRPCRemoteClientTestCase {
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
- classGetter = mock(StreamDataMappingGetter.class);
-
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
classGetter);
-
WorkerInstancesService workerInstancesService = new
WorkerInstancesService();
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceGetter.class,
workerInstancesService);
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class,
workerInstancesService);
@@ -79,16 +86,11 @@ public class GRPCRemoteClientTestCase {
grpcServerRule.getServiceRegistry().addService(new
RemoteServiceHandler(moduleManager));
Address address = new Address("not-important", 11, false);
- GRPCRemoteClient remoteClient = spy(new
GRPCRemoteClient(moduleManager, classGetter, address, 1, 10));
+ GRPCRemoteClient remoteClient = spy(new
GRPCRemoteClient(moduleManager, address, 1, 10));
remoteClient.connect();
doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel();
- when(classGetter.findIdByClass(TestStreamData.class)).thenReturn(1);
-
- Class dataClass = TestStreamData.class;
- when(classGetter.findClassById(1)).thenReturn(dataClass);
-
for (int i = 0; i < 12; i++) {
remoteClient.push(nextWorkerId, new TestStreamData());
}
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
index 2741af2..e83aa63 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
@@ -18,16 +18,23 @@
package org.apache.skywalking.oap.server.core.remote.client;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cluster.*;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
-import org.junit.*;
+import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
+import org.junit.Assert;
+import org.junit.Test;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* @author peng-yongsheng
@@ -46,8 +53,6 @@ public class RemoteClientManagerTestCase {
ClusterNodesQuery clusterNodesQuery = mock(ClusterNodesQuery.class);
clusterModuleDefine.provider().registerServiceImplementation(ClusterNodesQuery.class,
clusterNodesQuery);
- StreamDataMappingGetter streamDataMappingGetter =
mock(StreamDataMappingGetter.class);
-
coreModuleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
streamDataMappingGetter);
MetricsCreator metricsCreator = mock(MetricsCreator.class);
when(metricsCreator.createGauge(any(), any(), any(),
any())).thenReturn(new GaugeMetrics() {
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
index 1baa2c0..6cfcd37 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.CoreModuleProvider;
-import org.apache.skywalking.oap.server.core.remote.define.StreamDataMapping;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
@@ -37,7 +36,6 @@ public class StorageInstallerTestCase {
@Test
public void testInstall() throws StorageException,
ServiceNotProvidedException {
- StreamDataMapping streamDataMapping = new StreamDataMapping();
CoreModuleProvider moduleProvider =
Mockito.mock(CoreModuleProvider.class);
CoreModule moduleDefine = Mockito.spy(CoreModule.class);
ModuleManager moduleManager = Mockito.mock(ModuleManager.class);
@@ -45,7 +43,6 @@ public class StorageInstallerTestCase {
Whitebox.setInternalState(moduleDefine, "loadedProvider",
moduleProvider);
Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleDefine);
-
Mockito.when(moduleProvider.getService(StreamDataMapping.class)).thenReturn(streamDataMapping);
// streamDataMapping.generate();