This is an automated email from the ASF dual-hosted git repository.
pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new d256fc3 OAP internal RemoteService protocol change and code refactor
(#3128)
d256fc3 is described below
commit d256fc348e727549cebf6d4f42ba88458ab771ab
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Sat Jul 20 23:47:17 2019 +0800
OAP internal RemoteService protocol change and code refactor (#3128)
* Remove the worker id, and add worker name for remote handler only.
* Remote metrics and inventory classes mapping too.
* Refactor codes.
---
.../skywalking/oap/server/core/CoreModule.java | 39 ++++++++----
.../oap/server/core/CoreModuleProvider.java | 74 ++++++++++++++++------
.../core/analysis/worker/MetricsRemoteWorker.java | 11 ++--
.../analysis/worker/MetricsStreamProcessor.java | 35 ++++++----
.../core/analysis/worker/MetricsTransWorker.java | 15 +++--
.../register/worker/InventoryStreamProcessor.java | 33 ++++++----
.../register/worker/RegisterPersistentWorker.java | 20 ++++--
.../core/register/worker/RegisterRemoteWorker.java | 8 +--
.../server/core/remote/RemoteSenderService.java | 8 +--
.../server/core/remote/RemoteServiceHandler.java | 44 +++++++------
.../core/remote/client/GRPCRemoteClient.java | 39 ++++++------
.../server/core/remote/client/RemoteClient.java | 2 +-
.../core/remote/client/RemoteClientManager.java | 40 ++++++------
.../core/remote/client/SelfRemoteClient.java | 11 ++--
.../core/remote/define/StreamDataMapping.java | 72 ---------------------
.../remote/define/StreamDataMappingGetter.java | 32 ----------
.../oap/server/core/worker/AbstractWorker.java | 5 --
.../server/core/worker/IWorkerInstanceGetter.java | 3 +-
.../server/core/worker/IWorkerInstanceSetter.java | 4 +-
.../RemoteHandleWorker.java} | 14 ++--
.../server/core/worker/WorkerInstancesService.java | 31 +++++----
.../server-core/src/main/proto/RemoteService.proto | 3 +-
.../core/remote/RemoteServiceHandlerTestCase.java | 41 ++++++------
.../remote/client/GRPCRemoteClientRealClient.java | 28 +++-----
.../remote/client/GRPCRemoteClientRealServer.java | 6 +-
.../remote/client/GRPCRemoteClientTestCase.java | 38 +++++------
.../remote/client/RemoteClientManagerTestCase.java | 23 ++++---
.../core/storage/StorageInstallerTestCase.java | 3 -
28 files changed, 341 insertions(+), 341 deletions(-)
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index f31445c..4bfef5a 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -18,18 +18,37 @@
package org.apache.skywalking.oap.server.core;
-import java.util.*;
-import org.apache.skywalking.oap.server.core.cache.*;
-import org.apache.skywalking.oap.server.core.config.*;
-import org.apache.skywalking.oap.server.core.query.*;
-import org.apache.skywalking.oap.server.core.register.service.*;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
+import
org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
+import
org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
+import
org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
+import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
+import org.apache.skywalking.oap.server.core.query.LogQueryService;
+import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
+import org.apache.skywalking.oap.server.core.query.MetricQueryService;
+import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
+import org.apache.skywalking.oap.server.core.query.TraceQueryService;
+import
org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
-import org.apache.skywalking.oap.server.core.remote.define.*;
-import org.apache.skywalking.oap.server.core.server.*;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
-import org.apache.skywalking.oap.server.core.storage.model.*;
-import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
+import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
@@ -82,8 +101,6 @@ public class CoreModule extends ModuleDefine {
classes.add(IModelSetter.class);
classes.add(IModelGetter.class);
classes.add(IModelOverride.class);
- classes.add(StreamDataMappingGetter.class);
- classes.add(StreamDataMappingSetter.class);
classes.add(RemoteClientManager.class);
classes.add(RemoteSenderService.class);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 2c8f78a..f05b776 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -20,26 +20,65 @@ package org.apache.skywalking.oap.server.core;
import java.io.IOException;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
-import org.apache.skywalking.oap.server.core.analysis.*;
+import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
+import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
-import org.apache.skywalking.oap.server.core.cache.*;
-import org.apache.skywalking.oap.server.core.cluster.*;
-import org.apache.skywalking.oap.server.core.config.*;
+import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
+import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
+import
org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
+import
org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import
org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
+import
org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngine;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoader;
-import org.apache.skywalking.oap.server.core.query.*;
-import org.apache.skywalking.oap.server.core.register.service.*;
-import org.apache.skywalking.oap.server.core.remote.*;
-import org.apache.skywalking.oap.server.core.remote.client.*;
-import org.apache.skywalking.oap.server.core.remote.define.*;
+import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
+import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
+import org.apache.skywalking.oap.server.core.query.LogQueryService;
+import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
+import org.apache.skywalking.oap.server.core.query.MetricQueryService;
+import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
+import org.apache.skywalking.oap.server.core.query.TraceQueryService;
+import
org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import
org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
-import org.apache.skywalking.oap.server.core.server.*;
-import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
+import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
-import org.apache.skywalking.oap.server.core.worker.*;
-import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
@@ -56,7 +95,6 @@ public class CoreModuleProvider extends ModuleProvider {
private RemoteClientManager remoteClientManager;
private final AnnotationScan annotationScan;
private final StorageModels storageModels;
- private final StreamDataMapping streamDataMapping;
private final SourceReceiverImpl receiver;
private StreamAnnotationListener streamAnnotationListener;
private OALEngine oalEngine;
@@ -65,7 +103,6 @@ public class CoreModuleProvider extends ModuleProvider {
super();
this.moduleConfig = new CoreModuleConfig();
this.annotationScan = new AnnotationScan();
- this.streamDataMapping = new StreamDataMapping();
this.storageModels = new StorageModels();
this.receiver = new SourceReceiverImpl();
}
@@ -129,9 +166,6 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(SourceReceiver.class, receiver);
- this.registerServiceImplementation(StreamDataMappingGetter.class,
streamDataMapping);
- this.registerServiceImplementation(StreamDataMappingSetter.class,
streamDataMapping);
-
WorkerInstancesService instancesService = new WorkerInstancesService();
this.registerServiceImplementation(IWorkerInstanceGetter.class,
instancesService);
this.registerServiceImplementation(IWorkerInstanceSetter.class,
instancesService);
@@ -178,8 +212,6 @@ public class CoreModuleProvider extends ModuleProvider {
annotationScan.scan();
oalEngine.notifyAllListeners();
-
- streamDataMapping.init();
} catch (IOException | IllegalAccessException | InstantiationException
e) {
throw new ModuleStartException(e.getMessage(), e);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java
index 7ce9311..392d9b2 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsRemoteWorker.java
@@ -33,21 +33,18 @@ public class MetricsRemoteWorker extends
AbstractWorker<Metrics> {
private static final Logger logger =
LoggerFactory.getLogger(MetricsRemoteWorker.class);
- private final AbstractWorker<Metrics> nextWorker;
private final RemoteSenderService remoteSender;
- private final String modelName;
+ private final String remoteReceiverWorkerName;
- MetricsRemoteWorker(ModuleDefineHolder moduleDefineHolder,
AbstractWorker<Metrics> nextWorker,
- String modelName) {
+ MetricsRemoteWorker(ModuleDefineHolder moduleDefineHolder, String
remoteReceiverWorkerName) {
super(moduleDefineHolder);
this.remoteSender =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class);
- this.nextWorker = nextWorker;
- this.modelName = modelName;
+ this.remoteReceiverWorkerName = remoteReceiverWorkerName;
}
@Override public final void in(Metrics metrics) {
try {
- remoteSender.send(nextWorker.getWorkerId(), metrics,
Selector.HashCode);
+ remoteSender.send(remoteReceiverWorkerName, metrics,
Selector.HashCode);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index dfd4b20..78d0398 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -18,16 +18,26 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import lombok.Getter;
-import org.apache.skywalking.oap.server.core.*;
-import org.apache.skywalking.oap.server.core.analysis.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
+import org.apache.skywalking.oap.server.core.analysis.Downsampling;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
@@ -67,9 +77,6 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
IModelSetter modelSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
DownsamplingConfigService configService =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(DownsamplingConfigService.class);
- StreamDataMappingSetter streamDataMappingSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
- streamDataMappingSetter.putIfAbsent(metricsClass);
-
MetricsPersistentWorker hourPersistentWorker = null;
MetricsPersistentWorker dayPersistentWorker = null;
MetricsPersistentWorker monthPersistentWorker = null;
@@ -91,13 +98,19 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
MetricsPersistentWorker minutePersistentWorker =
minutePersistentWorker(moduleDefineHolder, metricsDAO, model);
MetricsTransWorker transWorker = new
MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker,
hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
- MetricsRemoteWorker remoteWorker = new
MetricsRemoteWorker(moduleDefineHolder, transWorker, stream.name());
+
+ String remoteReceiverWorkerName = stream.name() + "_rec";
+ IWorkerInstanceSetter workerInstanceSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
+ workerInstanceSetter.put(remoteReceiverWorkerName, transWorker,
metricsClass);
+
+ MetricsRemoteWorker remoteWorker = new
MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
MetricsAggregateWorker aggregateWorker = new
MetricsAggregateWorker(moduleDefineHolder, remoteWorker, stream.name());
entryWorkers.put(metricsClass, aggregateWorker);
}
- private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder
moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
+ private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder
moduleDefineHolder,
+ IMetricsDAO metricsDAO, Model model) {
AlarmNotifyWorker alarmNotifyWorker = new
AlarmNotifyWorker(moduleDefineHolder);
ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
index e4d2520..cade9b9 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
@@ -23,8 +23,11 @@ import
org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
@@ -38,10 +41,10 @@ public class MetricsTransWorker extends
AbstractWorker<Metrics> {
private final MetricsPersistentWorker dayPersistenceWorker;
private final MetricsPersistentWorker monthPersistenceWorker;
- private CounterMetrics aggregationMinCounter;
- private CounterMetrics aggregationHourCounter;
- private CounterMetrics aggregationDayCounter;
- private CounterMetrics aggregationMonthCounter;
+ private final CounterMetrics aggregationMinCounter;
+ private final CounterMetrics aggregationHourCounter;
+ private final CounterMetrics aggregationDayCounter;
+ private final CounterMetrics aggregationMonthCounter;
public MetricsTransWorker(ModuleDefineHolder moduleDefineHolder, String
modelName,
MetricsPersistentWorker minutePersistenceWorker,
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
index 4f9f3f7..ed47c80 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
@@ -18,14 +18,23 @@
package org.apache.skywalking.oap.server.core.register.worker;
-import java.util.*;
-import org.apache.skywalking.oap.server.core.*;
-import org.apache.skywalking.oap.server.core.analysis.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.Downsampling;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingSetter;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.*;
+import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
@@ -46,7 +55,8 @@ public class InventoryStreamProcessor implements
StreamProcessor<RegisterSource>
}
@SuppressWarnings("unchecked")
- public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
Class<? extends RegisterSource> inventoryClass) {
+ public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
+ Class<? extends RegisterSource> inventoryClass) {
StorageDAO storageDAO =
moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRegisterDAO registerDAO;
try {
@@ -58,12 +68,13 @@ public class InventoryStreamProcessor implements
StreamProcessor<RegisterSource>
IModelSetter modelSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(inventoryClass,
stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None),
false);
- StreamDataMappingSetter streamDataMappingSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
- streamDataMappingSetter.putIfAbsent(inventoryClass);
-
RegisterPersistentWorker persistentWorker = new
RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO,
stream.scopeId());
- RegisterRemoteWorker remoteWorker = new
RegisterRemoteWorker(moduleDefineHolder, persistentWorker);
+ String remoteReceiverWorkerName = stream.name() + "_rec";
+ IWorkerInstanceSetter workerInstanceSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
+ workerInstanceSetter.put(remoteReceiverWorkerName, persistentWorker,
inventoryClass);
+
+ RegisterRemoteWorker remoteWorker = new
RegisterRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
RegisterDistinctWorker distinctWorker = new
RegisterDistinctWorker(moduleDefineHolder, remoteWorker);
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 5cd5079..2fb3101 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -18,17 +18,27 @@
package org.apache.skywalking.oap.server.core.register.worker;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
-import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
+import
org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
+import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
index 513a376..8366186 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterRemoteWorker.java
@@ -33,18 +33,18 @@ public class RegisterRemoteWorker extends
AbstractWorker<RegisterSource> {
private static final Logger logger =
LoggerFactory.getLogger(RegisterRemoteWorker.class);
- private final AbstractWorker<RegisterSource> nextWorker;
+ private final String remoteReceiverWorkerName;
private final RemoteSenderService remoteSender;
- RegisterRemoteWorker(ModuleDefineHolder moduleDefineHolder,
AbstractWorker<RegisterSource> nextWorker) {
+ RegisterRemoteWorker(ModuleDefineHolder moduleDefineHolder, String
remoteReceiverWorkerName) {
super(moduleDefineHolder);
this.remoteSender =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(RemoteSenderService.class);
- this.nextWorker = nextWorker;
+ this.remoteReceiverWorkerName = remoteReceiverWorkerName;
}
@Override public final void in(RegisterSource registerSource) {
try {
- remoteSender.send(nextWorker.getWorkerId(), registerSource,
Selector.ForeverFirst);
+ remoteSender.send(remoteReceiverWorkerName, registerSource,
Selector.ForeverFirst);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
index a028546..436ecd2 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
@@ -41,22 +41,22 @@ public class RemoteSenderService implements Service {
this.rollingSelector = new RollingSelector();
}
- public void send(int nextWorkId, StreamData streamData, Selector selector)
{
+ public void send(String nextWorkName, StreamData streamData, Selector
selector) {
RemoteClientManager clientManager =
moduleManager.find(CoreModule.NAME).provider().getService(RemoteClientManager.class);
RemoteClient remoteClient;
switch (selector) {
case HashCode:
remoteClient =
hashCodeSelector.select(clientManager.getRemoteClient(), streamData);
- remoteClient.push(nextWorkId, streamData);
+ remoteClient.push(nextWorkName, streamData);
break;
case Rolling:
remoteClient =
rollingSelector.select(clientManager.getRemoteClient(), streamData);
- remoteClient.push(nextWorkId, streamData);
+ remoteClient.push(nextWorkName, streamData);
break;
case ForeverFirst:
remoteClient =
foreverFirstSelector.select(clientManager.getRemoteClient(), streamData);
- remoteClient.push(nextWorkId, streamData);
+ remoteClient.push(nextWorkName, streamData);
break;
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
index 2fd5887..17d546e 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -22,14 +22,22 @@ import io.grpc.stub.StreamObserver;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
+import
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.RemoteHandleWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class is Server-side streaming RPC implementation. It's a common
service for OAP servers to receive message from
@@ -43,10 +51,10 @@ public class RemoteServiceHandler extends
RemoteServiceGrpc.RemoteServiceImplBas
private static final Logger logger =
LoggerFactory.getLogger(RemoteServiceHandler.class);
private final ModuleDefineHolder moduleDefineHolder;
- private StreamDataMappingGetter streamDataMappingGetter;
private IWorkerInstanceGetter workerInstanceGetter;
private CounterMetrics remoteInCounter;
private CounterMetrics remoteInErrorCounter;
+ private CounterMetrics remoteInTargetNotFoundCounter;
private HistogramMetrics remoteInHistogram;
public RemoteServiceHandler(ModuleDefineHolder moduleDefineHolder) {
@@ -58,20 +66,15 @@ public class RemoteServiceHandler extends
RemoteServiceGrpc.RemoteServiceImplBas
remoteInErrorCounter =
moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
.createCounter("remote_in_error_count", "The error number(server
side) of inside remote inside aggregate rpc.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ remoteInTargetNotFoundCounter =
moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
+ .createCounter("remote_in_target_not_found_count", "The error
number(server side) of inside remote handler target worker not found. May be
caused by unmatched OAL scrips.",
+ MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
remoteInHistogram =
moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
.createHistogramMetric("remote_in_latency", "The latency(server
side) of inside remote inside aggregate rpc.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
}
@Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty>
responseObserver) {
- if (Objects.isNull(streamDataMappingGetter)) {
- synchronized (RemoteServiceHandler.class) {
- if (Objects.isNull(streamDataMappingGetter)) {
- streamDataMappingGetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class);
- }
- }
- }
-
if (Objects.isNull(workerInstanceGetter)) {
synchronized (RemoteServiceHandler.class) {
if (Objects.isNull(workerInstanceGetter)) {
@@ -85,15 +88,20 @@ public class RemoteServiceHandler extends
RemoteServiceGrpc.RemoteServiceImplBas
remoteInCounter.inc();
HistogramMetrics.Timer timer = remoteInHistogram.createTimer();
try {
- int streamDataId = message.getStreamDataId();
- int nextWorkerId = message.getNextWorkerId();
+ String nextWorkerName = message.getNextWorkerName();
RemoteData remoteData = message.getRemoteData();
- Class<? extends StreamData> streamDataClass =
streamDataMappingGetter.findClassById(streamDataId);
try {
- StreamData streamData = streamDataClass.newInstance();
+ RemoteHandleWorker handleWorker =
workerInstanceGetter.get(nextWorkerName);
+ AbstractWorker nextWorker = handleWorker.getWorker();
+ StreamData streamData =
handleWorker.getStreamDataClass().newInstance();
streamData.deserialize(remoteData);
- workerInstanceGetter.get(nextWorkerId).in(streamData);
+ if (nextWorker != null) {
+ nextWorker.in(streamData);
+ } else {
+ remoteInTargetNotFoundCounter.inc();
+ logger.warn("Work name [{}] not found. Check OAL
script, make sure they are same in the whole cluster.", nextWorkerName);
+ }
} catch (Throwable t) {
remoteInErrorCounter.inc();
logger.error(t.getMessage(), t);
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index 4927b97..fd80c9c 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -20,24 +20,29 @@ package org.apache.skywalking.oap.server.core.remote.client;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
-import java.util.*;
+import java.util.List;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
+import
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
-import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * This is a wrapper of the gRPC client for sending message to each other OAP
server.
- * It contains a block queue to buffering the message and sending the message
by batch.
+ * This is a wrapper of the gRPC client for sending message to each other OAP
server. It contains a block queue to
+ * buffering the message and sending the message by batch.
*
* @author peng-yongsheng
*/
@@ -48,7 +53,6 @@ public class GRPCRemoteClient implements RemoteClient {
private final int channelSize;
private final int bufferSize;
private final Address address;
- private final StreamDataMappingGetter streamDataMappingGetter;
private final AtomicInteger concurrentStreamObserverNumber = new
AtomicInteger(0);
private GRPCClient client;
private DataCarrier<RemoteMessage> carrier;
@@ -56,10 +60,8 @@ public class GRPCRemoteClient implements RemoteClient {
private CounterMetrics remoteOutCounter;
private CounterMetrics remoteOutErrorCounter;
-
- public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder,
StreamDataMappingGetter streamDataMappingGetter, Address address, int
channelSize,
+ public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, Address
address, int channelSize,
int bufferSize) {
- this.streamDataMappingGetter = streamDataMappingGetter;
this.address = address;
this.channelSize = channelSize;
this.bufferSize = bufferSize;
@@ -119,14 +121,12 @@ public class GRPCRemoteClient implements RemoteClient {
/**
* Push stream data which need to send to another OAP server.
*
- * @param nextWorkerId the id of a worker which will process this stream
data.
+ * @param nextWorkerName the name of a worker which will process this
stream data.
* @param streamData the entity contains the values.
*/
- @Override public void push(int nextWorkerId, StreamData streamData) {
- int streamDataId =
streamDataMappingGetter.findIdByClass(streamData.getClass());
+ @Override public void push(String nextWorkerName, StreamData streamData) {
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
- builder.setNextWorkerId(nextWorkerId);
- builder.setStreamDataId(streamDataId);
+ builder.setNextWorkerName(nextWorkerName);
builder.setRemoteData(streamData.serialize());
this.getDataCarrier().produce(builder.build());
@@ -159,9 +159,8 @@ public class GRPCRemoteClient implements RemoteClient {
}
/**
- * Create a gRPC stream observer to sending stream data, one stream
observer
- * could send multiple stream data by a single consume.
- * The max number of concurrency allowed at the same time is 10.
+ * Create a gRPC stream observer to sending stream data, one stream
observer could send multiple stream data by a
+ * single consume. The max number of concurrency allowed at the same time
is 10.
*
* @return stream observer
*/
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
index ccc216f..3330f3d 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClient.java
@@ -31,5 +31,5 @@ public interface RemoteClient extends
Comparable<RemoteClient> {
void close();
- void push(int nextWorkerId, StreamData streamData);
+ void push(String nextWorkerName, StreamData streamData);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 9bfd4d1..06a963f 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -18,15 +18,28 @@
package org.apache.skywalking.oap.server.core.remote.client;
-import java.util.*;
-import java.util.concurrent.*;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cluster.*;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
-import org.apache.skywalking.oap.server.library.module.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class manages the connections between OAP servers. There is a task
schedule that will automatically query a
@@ -39,7 +52,6 @@ public class RemoteClientManager implements Service {
private static final Logger logger =
LoggerFactory.getLogger(RemoteClientManager.class);
private final ModuleDefineHolder moduleDefineHolder;
- private StreamDataMappingGetter streamDataMappingGetter;
private ClusterNodesQuery clusterNodesQuery;
private final List<RemoteClient> clientsA;
private final List<RemoteClient> clientsB;
@@ -76,14 +88,6 @@ public class RemoteClientManager implements Service {
}
}
- if (Objects.isNull(streamDataMappingGetter)) {
- synchronized (RemoteClientManager.class) {
- if (Objects.isNull(streamDataMappingGetter)) {
- this.streamDataMappingGetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class);
- }
- }
- }
-
if (logger.isDebugEnabled()) {
logger.debug("Refresh remote nodes collection.");
}
@@ -199,7 +203,7 @@ public class RemoteClientManager implements Service {
RemoteClient client = new
SelfRemoteClient(moduleDefineHolder, address);
getFreeClients().add(client);
} else {
- RemoteClient client = new
GRPCRemoteClient(moduleDefineHolder, streamDataMappingGetter, address, 1, 3000);
+ RemoteClient client = new
GRPCRemoteClient(moduleDefineHolder, address, 1, 3000);
client.connect();
getFreeClients().add(client);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
index 67827a9..110d9e9 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/SelfRemoteClient.java
@@ -18,12 +18,15 @@
package org.apache.skywalking.oap.server.core.remote.client;
-import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
/**
* @author peng-yongsheng
@@ -53,8 +56,8 @@ public class SelfRemoteClient implements RemoteClient {
throw new UnexpectedException("Self remote client invoked to close.");
}
- @Override public void push(int nextWorkerId, StreamData streamData) {
- workerInstanceGetter.get(nextWorkerId).in(streamData);
+ @Override public void push(String nextWorkerName, StreamData streamData) {
+ workerInstanceGetter.get(nextWorkerName).getWorker().in(streamData);
}
@Override public int compareTo(RemoteClient o) {
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
deleted file mode 100644
index 00ef25b..0000000
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMapping.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.remote.define;
-
-import java.util.*;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-
-/**
- * @author peng-yongsheng
- */
-public class StreamDataMapping implements StreamDataMappingGetter,
StreamDataMappingSetter {
- private List<Class<? extends StreamData>> streamClassList;
- private final Map<Class<? extends StreamData>, Integer> classMap;
- private final Map<Integer, Class<? extends StreamData>> idMap;
-
- public StreamDataMapping() {
- streamClassList = new ArrayList<>();
- this.classMap = new HashMap<>();
- this.idMap = new HashMap<>();
- }
-
- @Override public synchronized void putIfAbsent(Class<? extends StreamData>
streamDataClass) {
- if (classMap.containsKey(streamDataClass)) {
- return;
- }
-
- streamClassList.add(streamDataClass);
- }
-
- public void init() {
- /**
- * The stream protocol use this list order to assign the ID,
- * which is used in across node communication. This order must be
certain.
- */
- Collections.sort(streamClassList, new Comparator<Class>() {
- @Override public int compare(Class streamClass1, Class
streamClass2) {
- return
streamClass1.getName().compareTo(streamClass2.getName());
- }
- });
-
- for (int i = 0; i < streamClassList.size(); i++) {
- Class<? extends StreamData> streamClass = streamClassList.get(i);
- int streamId = i + 1;
- classMap.put(streamClass, streamId);
- idMap.put(streamId, streamClass);
- }
- }
-
- @Override public int findIdByClass(Class<? extends StreamData>
streamDataClass) {
- return classMap.get(streamDataClass);
- }
-
- @Override public Class<? extends StreamData> findClassById(int id) {
- return idMap.get(id);
- }
-}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java
deleted file mode 100644
index fffebaf..0000000
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingGetter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.remote.define;
-
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.library.module.Service;
-
-/**
- * @author peng-yongsheng
- */
-public interface StreamDataMappingGetter extends Service {
-
- int findIdByClass(Class<? extends StreamData> streamDataClass);
-
- Class<? extends StreamData> findClassById(int id);
-}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
index c68b690..b513d38 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
@@ -19,21 +19,16 @@
package org.apache.skywalking.oap.server.core.worker;
import lombok.Getter;
-import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* @author peng-yongsheng
*/
public abstract class AbstractWorker<INPUT> {
-
- @Getter private final int workerId;
@Getter private final ModuleDefineHolder moduleDefineHolder;
public AbstractWorker(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
- IWorkerInstanceSetter workerInstanceSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
- this.workerId = workerInstanceSetter.put(this);
}
public abstract void in(INPUT input);
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
index 9b54184..86f73f0 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceGetter.java
@@ -25,5 +25,6 @@ import
org.apache.skywalking.oap.server.library.module.Service;
*/
public interface IWorkerInstanceGetter extends Service {
- AbstractWorker get(int workerId);
+ RemoteHandleWorker get(String nextWorkerName);
+
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
index eef279f..c6da429 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
@@ -18,12 +18,12 @@
package org.apache.skywalking.oap.server.core.worker;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
public interface IWorkerInstanceSetter extends Service {
-
- int put(AbstractWorker instance);
+ void put(String remoteReceiverWorkName, AbstractWorker instance, Class<?
extends StreamData> streamDataClass);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java
similarity index 75%
rename from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java
rename to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java
index dfff820..270c9d6 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/define/StreamDataMappingSetter.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java
@@ -16,14 +16,18 @@
*
*/
-package org.apache.skywalking.oap.server.core.remote.define;
+package org.apache.skywalking.oap.server.core.worker;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import org.apache.skywalking.oap.server.library.module.Service;
/**
- * @author peng-yongsheng
+ * @author wusheng
*/
-public interface StreamDataMappingSetter extends Service {
- void putIfAbsent(Class<? extends StreamData> streamDataClass);
+@AllArgsConstructor
+@Getter
+public class RemoteHandleWorker {
+ private AbstractWorker worker;
+ private Class<? extends StreamData> streamDataClass;
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
index 51d671a..0852a6a 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
@@ -18,28 +18,37 @@
package org.apache.skywalking.oap.server.core.worker;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * @author peng-yongsheng
+ * Worker Instance Service hosts all remote handler workers with the stream
data type.
+ *
+ * @author peng-yongsheng, wusheng
*/
public class WorkerInstancesService implements IWorkerInstanceSetter,
IWorkerInstanceGetter {
+ private static final Logger logger =
LoggerFactory.getLogger(WorkerInstancesService.class);
- private final AtomicInteger generator = new AtomicInteger(1);
- private final Map<Integer, AbstractWorker> instances;
+ private final Map<String, RemoteHandleWorker> instances;
public WorkerInstancesService() {
this.instances = new HashMap<>();
}
- @Override public AbstractWorker get(int workerId) {
- return instances.get(workerId);
+ @Override public RemoteHandleWorker get(String nextWorkerName) {
+ return instances.get(nextWorkerName);
}
- @Override public int put(AbstractWorker instance) {
- int workerId = generator.getAndIncrement();
- instances.put(workerId, instance);
- return workerId;
+ @Override public void put(String remoteReceiverWorkName, AbstractWorker
instance,
+ Class<? extends StreamData> streamDataClass) {
+ if (instances.containsKey(remoteReceiverWorkName)) {
+ throw new UnexpectedException("Duplicate worker name:" +
remoteReceiverWorkName);
+ }
+ instances.put(remoteReceiverWorkName, new RemoteHandleWorker(instance,
streamDataClass));
+ logger.debug("Worker {} has been registered as {}",
instance.toString(), remoteReceiverWorkName);
}
}
diff --git a/oap-server/server-core/src/main/proto/RemoteService.proto
b/oap-server/server-core/src/main/proto/RemoteService.proto
index 853ea82..73ed7a6 100644
--- a/oap-server/server-core/src/main/proto/RemoteService.proto
+++ b/oap-server/server-core/src/main/proto/RemoteService.proto
@@ -27,8 +27,7 @@ service RemoteService {
}
message RemoteMessage {
- int32 nextWorkerId = 1;
- int32 streamDataId = 2;
+ string nextWorkerName = 1;
RemoteData remoteData = 3;
}
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
index b9081d1..50d1336 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
@@ -18,22 +18,34 @@
package org.apache.skywalking.oap.server.core.remote;
-import io.grpc.inprocess.*;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
+import
org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.module.*;
+import
org.apache.skywalking.oap.server.library.module.DuplicateProviderException;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+import
org.apache.skywalking.oap.server.library.module.ProviderNotFoundException;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
-import org.junit.*;
-
-import static org.mockito.Mockito.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* @author peng-yongsheng
@@ -46,18 +58,12 @@ public class RemoteServiceHandlerTestCase {
@Test
public void callTest() throws DuplicateProviderException,
ProviderNotFoundException, IOException {
final int streamDataClassId = 1;
- final int testWorkerId = 1;
+ final String testWorkerId = "mock-worker";
ModuleManagerTesting moduleManager = new ModuleManagerTesting();
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
- StreamDataMappingGetter classGetter =
mock(StreamDataMappingGetter.class);
- Class dataClass = TestRemoteData.class;
-
when(classGetter.findClassById(streamDataClassId)).thenReturn(dataClass);
-
-
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
classGetter);
-
String serverName = InProcessServerBuilder.generateName();
MetricsCreator metricsCreator = mock(MetricsCreator.class);
when(metricsCreator.createCounter(any(), any(), any(),
any())).thenReturn(new CounterMetrics() {
@@ -101,8 +107,7 @@ public class RemoteServiceHandlerTestCase {
});
RemoteMessage.Builder remoteMessage = RemoteMessage.newBuilder();
- remoteMessage.setStreamDataId(streamDataClassId);
- remoteMessage.setNextWorkerId(testWorkerId);
+ remoteMessage.setNextWorkerName(testWorkerId);
RemoteData.Builder remoteData = RemoteData.newBuilder();
remoteData.addDataStrings("test1");
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
index be34824..7f506f9 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java
@@ -19,17 +19,21 @@
package org.apache.skywalking.oap.server.core.remote.client;
import java.util.concurrent.TimeUnit;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
import org.junit.Assert;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
/**
* @author peng-yongsheng
@@ -53,29 +57,17 @@ public class GRPCRemoteClientRealClient {
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class,
metricsCreator);
- GRPCRemoteClient remoteClient = spy(new
GRPCRemoteClient(moduleManager, new TestMappingGetter(), address, 1, 10));
+ GRPCRemoteClient remoteClient = spy(new
GRPCRemoteClient(moduleManager, address, 1, 10));
remoteClient.connect();
for (int i = 0; i < 10000; i++) {
- remoteClient.push(1, new TestStreamData());
+ remoteClient.push("mock_remote", new TestStreamData());
TimeUnit.SECONDS.sleep(1);
}
TimeUnit.MINUTES.sleep(10);
}
- public static class TestMappingGetter implements StreamDataMappingGetter {
-
- @Override public int findIdByClass(Class streamDataClass) {
- return 1;
- }
-
- @Override public Class<StreamData> findClassById(int id) {
- Class<?> clazz = TestStreamData.class;
- return (Class<StreamData>)clazz;
- }
- }
-
public static class TestStreamData extends StreamData {
private long value;
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
index 51945f2..a0c2113 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealServer.java
@@ -21,10 +21,10 @@ package org.apache.skywalking.oap.server.core.remote.client;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
-import org.apache.skywalking.oap.server.testing.module.*;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
/**
* @author peng-yongsheng
@@ -36,8 +36,6 @@ public class GRPCRemoteClientRealServer {
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
-
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
new GRPCRemoteClientRealClient.TestMappingGetter());
-
GRPCServer server = new GRPCServer("localhost", 10000);
server.initialize();
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
index 8a6494f..3513ca3 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
@@ -22,26 +22,36 @@ import io.grpc.testing.GrpcServerRule;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
-import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
+import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
+import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
-import org.junit.*;
-
-import static org.mockito.Mockito.*;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
/**
* @author peng-yongsheng
*/
public class GRPCRemoteClientTestCase {
- private final int nextWorkerId = 1;
+ private final String nextWorkerId = "mock-worker";
private ModuleManagerTesting moduleManager;
- private StreamDataMappingGetter classGetter;
@Rule public final GrpcServerRule grpcServerRule = new
GrpcServerRule().directExecutor();
@Before
@@ -50,9 +60,6 @@ public class GRPCRemoteClientTestCase {
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
- classGetter = mock(StreamDataMappingGetter.class);
-
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
classGetter);
-
WorkerInstancesService workerInstancesService = new
WorkerInstancesService();
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceGetter.class,
workerInstancesService);
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class,
workerInstancesService);
@@ -79,16 +86,11 @@ public class GRPCRemoteClientTestCase {
grpcServerRule.getServiceRegistry().addService(new
RemoteServiceHandler(moduleManager));
Address address = new Address("not-important", 11, false);
- GRPCRemoteClient remoteClient = spy(new
GRPCRemoteClient(moduleManager, classGetter, address, 1, 10));
+ GRPCRemoteClient remoteClient = spy(new
GRPCRemoteClient(moduleManager, address, 1, 10));
remoteClient.connect();
doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel();
- when(classGetter.findIdByClass(TestStreamData.class)).thenReturn(1);
-
- Class dataClass = TestStreamData.class;
- when(classGetter.findClassById(1)).thenReturn(dataClass);
-
for (int i = 0; i < 12; i++) {
remoteClient.push(nextWorkerId, new TestStreamData());
}
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
index 2741af2..e83aa63 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
@@ -18,16 +18,23 @@
package org.apache.skywalking.oap.server.core.remote.client;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cluster.*;
-import
org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.apache.skywalking.oap.server.testing.module.*;
-import org.junit.*;
+import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
+import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
+import org.junit.Assert;
+import org.junit.Test;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* @author peng-yongsheng
@@ -46,8 +53,6 @@ public class RemoteClientManagerTestCase {
ClusterNodesQuery clusterNodesQuery = mock(ClusterNodesQuery.class);
clusterModuleDefine.provider().registerServiceImplementation(ClusterNodesQuery.class,
clusterNodesQuery);
- StreamDataMappingGetter streamDataMappingGetter =
mock(StreamDataMappingGetter.class);
-
coreModuleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class,
streamDataMappingGetter);
MetricsCreator metricsCreator = mock(MetricsCreator.class);
when(metricsCreator.createGauge(any(), any(), any(),
any())).thenReturn(new GaugeMetrics() {
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
index 1baa2c0..6cfcd37 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.CoreModuleProvider;
-import org.apache.skywalking.oap.server.core.remote.define.StreamDataMapping;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
@@ -37,7 +36,6 @@ public class StorageInstallerTestCase {
@Test
public void testInstall() throws StorageException,
ServiceNotProvidedException {
- StreamDataMapping streamDataMapping = new StreamDataMapping();
CoreModuleProvider moduleProvider =
Mockito.mock(CoreModuleProvider.class);
CoreModule moduleDefine = Mockito.spy(CoreModule.class);
ModuleManager moduleManager = Mockito.mock(ModuleManager.class);
@@ -45,7 +43,6 @@ public class StorageInstallerTestCase {
Whitebox.setInternalState(moduleDefine, "loadedProvider",
moduleProvider);
Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleDefine);
-
Mockito.when(moduleProvider.getService(StreamDataMapping.class)).thenReturn(streamDataMapping);
// streamDataMapping.generate();