This is an automated email from the ASF dual-hosted git repository.
pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 8605f4c Change the metrics process flow. (#3157)
8605f4c is described below
commit 8605f4ccc05f13ef466515f2c551c59907b22fab
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Wed Jul 24 18:18:20 2019 +0800
Change the metrics process flow. (#3157)
* Feature of database session
* Make it configurable.
* Change the metrics process flow.
before: metrics entrance -> aggregate worker -> remote worker -> trans
worker -> minute, hour, day, month persistence worker -> storage
after: metrics entrance -> aggregate worker -> remote worker -> minute
persistence worker -> trans worker -> hour, day, month persistence worker ->
storage
---
.../analysis/worker/MetricsPersistentWorker.java | 7 ++++++-
.../analysis/worker/MetricsStreamProcessor.java | 14 +++++++-------
.../core/analysis/worker/MetricsTransWorker.java | 22 ++--------------------
3 files changed, 15 insertions(+), 28 deletions(-)
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 8c2865a..bbe5f7f 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -47,10 +47,11 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
private final AbstractWorker<Metrics> nextAlarmWorker;
private final AbstractWorker<ExportEvent> nextExportWorker;
private final DataCarrier<Metrics> dataCarrier;
+ private final MetricsTransWorker transWorker;
private final boolean enableDatabaseSession;
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model
model, IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
- AbstractWorker<ExportEvent> nextExportWorker, boolean
enableDatabaseSession) {
+ AbstractWorker<ExportEvent> nextExportWorker, MetricsTransWorker
transWorker, boolean enableDatabaseSession) {
super(moduleDefineHolder);
this.model = model;
this.databaseSession = new HashMap<>(100);
@@ -59,6 +60,7 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
this.metricsDAO = metricsDAO;
this.nextAlarmWorker = nextAlarmWorker;
this.nextExportWorker = nextExportWorker;
+ this.transWorker = transWorker;
String name = "METRICS_L2_AGGREGATION";
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
@@ -99,6 +101,9 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
ExportEvent event = new ExportEvent(data,
ExportEvent.EventType.INCREMENT);
nextExportWorker.in(event);
}
+ if (Objects.nonNull(transWorker)) {
+ transWorker.in(data);
+ }
int mod = i % batchGetSize;
if (mod == 0) {
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 4ee60ab..5da8a3e 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -86,14 +86,14 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
monthPersistentWorker = worker(moduleDefineHolder, metricsDAO,
model);
}
- Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(),
new Storage(stream.name(), true, true, Downsampling.Minute), false);
- MetricsPersistentWorker minutePersistentWorker =
minutePersistentWorker(moduleDefineHolder, metricsDAO, model);
+ MetricsTransWorker transWorker = new
MetricsTransWorker(moduleDefineHolder, stream.name(), hourPersistentWorker,
dayPersistentWorker, monthPersistentWorker);
- MetricsTransWorker transWorker = new
MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker,
hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
+ Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(),
new Storage(stream.name(), true, true, Downsampling.Minute), false);
+ MetricsPersistentWorker minutePersistentWorker =
minutePersistentWorker(moduleDefineHolder, metricsDAO, model, transWorker);
String remoteReceiverWorkerName = stream.name() + "_rec";
IWorkerInstanceSetter workerInstanceSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
- workerInstanceSetter.put(remoteReceiverWorkerName, transWorker,
metricsClass);
+ workerInstanceSetter.put(remoteReceiverWorkerName,
minutePersistentWorker, metricsClass);
MetricsRemoteWorker remoteWorker = new
MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
MetricsAggregateWorker aggregateWorker = new
MetricsAggregateWorker(moduleDefineHolder, remoteWorker, stream.name());
@@ -101,18 +101,18 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
entryWorkers.put(metricsClass, aggregateWorker);
}
- private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder
moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
+ private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder
moduleDefineHolder, IMetricsDAO metricsDAO, Model model, MetricsTransWorker
transWorker) {
AlarmNotifyWorker alarmNotifyWorker = new
AlarmNotifyWorker(moduleDefineHolder);
ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
- MetricsPersistentWorker minutePersistentWorker = new
MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO,
alarmNotifyWorker, exportWorker, enableDatabaseSession);
+ MetricsPersistentWorker minutePersistentWorker = new
MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO,
alarmNotifyWorker, exportWorker, transWorker, enableDatabaseSession);
persistentWorkers.add(minutePersistentWorker);
return minutePersistentWorker;
}
private MetricsPersistentWorker worker(ModuleDefineHolder
moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
- MetricsPersistentWorker persistentWorker = new
MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null,
enableDatabaseSession);
+ MetricsPersistentWorker persistentWorker = new
MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null,
null, enableDatabaseSession);
persistentWorkers.add(persistentWorker);
return persistentWorker;
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
index cade9b9..01c602d 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsTransWorker.java
@@ -23,11 +23,8 @@ import
org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
-import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
-import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.telemetry.api.*;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -36,30 +33,24 @@ public class MetricsTransWorker extends
AbstractWorker<Metrics> {
private static final Logger logger =
LoggerFactory.getLogger(MetricsTransWorker.class);
- private final MetricsPersistentWorker minutePersistenceWorker;
private final MetricsPersistentWorker hourPersistenceWorker;
private final MetricsPersistentWorker dayPersistenceWorker;
private final MetricsPersistentWorker monthPersistenceWorker;
- private final CounterMetrics aggregationMinCounter;
private final CounterMetrics aggregationHourCounter;
private final CounterMetrics aggregationDayCounter;
private final CounterMetrics aggregationMonthCounter;
public MetricsTransWorker(ModuleDefineHolder moduleDefineHolder, String
modelName,
- MetricsPersistentWorker minutePersistenceWorker,
MetricsPersistentWorker hourPersistenceWorker,
MetricsPersistentWorker dayPersistenceWorker,
MetricsPersistentWorker monthPersistenceWorker) {
super(moduleDefineHolder);
- this.minutePersistenceWorker = minutePersistenceWorker;
this.hourPersistenceWorker = hourPersistenceWorker;
this.dayPersistenceWorker = dayPersistenceWorker;
this.monthPersistenceWorker = monthPersistenceWorker;
MetricsCreator metricsCreator =
moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
- aggregationMinCounter =
metricsCreator.createCounter("metrics_aggregation", "The number of rows in
aggregation",
- new MetricsTag.Keys("metricName", "level", "dimensionality"), new
MetricsTag.Values(modelName, "2", "min"));
aggregationHourCounter =
metricsCreator.createCounter("metrics_aggregation", "The number of rows in
aggregation",
new MetricsTag.Keys("metricName", "level", "dimensionality"), new
MetricsTag.Values(modelName, "2", "hour"));
aggregationDayCounter =
metricsCreator.createCounter("metrics_aggregation", "The number of rows in
aggregation",
@@ -81,14 +72,5 @@ public class MetricsTransWorker extends
AbstractWorker<Metrics> {
aggregationHourCounter.inc();
monthPersistenceWorker.in(metrics.toMonth());
}
-
- /*
- * Minute persistent must be at the end of all time dimensionalities
- * Because #toHour, #toDay, #toMonth include clone inside, which could
avoid concurrency situation.
- */
- if (Objects.nonNull(minutePersistenceWorker)) {
- aggregationMinCounter.inc();
- minutePersistenceWorker.in(metrics);
- }
}
}