This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 50a0bcb Optimize L1 aggregation, enhance gRPC client between L1 and
L2 aggregation. (#7206)
50a0bcb is described below
commit 50a0bcbf7395c49029759a8cb1208271405cdcc4
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Wed Jun 30 19:14:20 2021 +0800
Optimize L1 aggregation, enhance gRPC client between L1 and L2 aggregation.
(#7206)
1.0Performance: Add L1 aggregation flush period, which reduces the CPU load
and helps young GC.
2. Replace do not direct send after the first aggregation to reduce the
network #6400.
3. Enhance the DataCarrier to notify the consumer in no enqueue event in
short term.
4. L1 aggregation flush period still works even no further metrics
generated, powered by <3>
5. Fix gRPC remote client OOM. The concurrency control mechanism failed.
---
CHANGES.md | 5 +-
.../datacarrier/consumer/ConsumerThread.java | 1 +
.../commons/datacarrier/consumer/IConsumer.java | 8 +++
.../consumer/MultipleChannelsConsumer.java | 3 +-
docs/en/setup/backend/configuration-vocabulary.md | 1 +
.../src/main/resources/application.yml | 2 +
.../oap/server/core/CoreModuleConfig.java | 9 ++-
.../oap/server/core/CoreModuleProvider.java | 3 +-
.../analysis/worker/MetricsAggregateWorker.java | 37 +++++++---
.../analysis/worker/MetricsStreamProcessor.java | 15 ++++-
.../core/remote/client/GRPCRemoteClient.java | 78 +++++++++++++---------
11 files changed, 114 insertions(+), 48 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 8ae7936..ef03150 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,8 @@ Release Notes.
* Extract dependency management to a bom.
* Add JDK 16 to test matrix.
+* DataCarrier consumer add a new event notification, call `nothingToConsume`
method if the queue has no element to
+ consume.
#### Java Agent
@@ -56,7 +58,8 @@ Release Notes.
* Performance: cache regex pattern and result, optimize string concatenation
in Envy ALS analyzer.
* Performance: cache metrics id and entity id in `Metrics` and `ISource`.
* Performance: enhance persistent session mechanism, about differentiating
cache timeout for different dimensionality
- metrics. The timeout of the cache for minute and hour level metrics has been
prolonged to ~5 min.
+ metrics. The timeout of the cache for minute and hour level metrics has been
prolonged to ~5 min.
+* Performance: Add L1 aggregation flush period, which reduce the CPU load and
help young GC.
#### UI
diff --git
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
index 5f35408..122f37e 100644
---
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
+++
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
@@ -80,6 +80,7 @@ public class ConsumerThread<T> extends Thread {
}
return true;
}
+ consumer.nothingToConsume();
return false;
}
diff --git
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java
index 3c6dc0d..07793eb 100644
---
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java
+++
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java
@@ -28,4 +28,12 @@ public interface IConsumer<T> {
void onError(List<T> data, Throwable t);
void onExit();
+
+ /**
+ * Notify the implementation, if there is nothing fetched from the queue.
This could be used as a timer to trigger
+ * reaction if the queue has no element.
+ */
+ default void nothingToConsume() {
+ return;
+ }
}
diff --git
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
index c65b8e8..953a21e 100644
---
a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
+++
b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -85,6 +85,7 @@ public class MultipleChannelsConsumer extends Thread {
}
return true;
}
+ target.consumer.nothingToConsume();
return false;
}
@@ -112,7 +113,7 @@ public class MultipleChannelsConsumer extends Thread {
}
private static class Group {
- private Channels channels;
+ private Channels channels;
private IConsumer consumer;
public Group(Channels channels, IConsumer consumer) {
diff --git a/docs/en/setup/backend/configuration-vocabulary.md
b/docs/en/setup/backend/configuration-vocabulary.md
index c435c8f..3fa9376 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -23,6 +23,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`.
**Receiver** mode
| - | - | dataKeeperExecutePeriod|The execution period of TTL scheduler, unit
is minute. Execution doesn't mean deleting data. The storage provider could
override this, such as ElasticSearch
storage.|SW_CORE_DATA_KEEPER_EXECUTE_PERIOD|5|
| - | - | recordDataTTL|The lifecycle of record data. Record data includes
traces, top n sampled records, and logs. Unit is day. Minimal value is
2.|SW_CORE_RECORD_DATA_TTL|3|
| - | - | metricsDataTTL|The lifecycle of metrics data, including the
metadata. Unit is day. Recommend metricsDataTTL >= recordDataTTL. Minimal value
is 2.| SW_CORE_METRICS_DATA_TTL|7|
+| - | - | l1FlushPeriod| The period of L1 aggregation flush to L2 aggregation.
Unit is ms. | SW_CORE_L1_AGGREGATION_FLUSH_PERIOD | 500 |
| - | - | enableDatabaseSession|Cache metrics data for 1 minute to reduce
database queries, and if the OAP cluster changes within that
minute.|SW_CORE_ENABLE_DATABASE_SESSION|true|
| - | - | topNReportPeriod|The execution period of top N sampler, which saves
sampled data into the storage. Unit is minute|SW_CORE_TOPN_REPORT_PERIOD|10|
| - | - | activeExtraModelColumns|Append the names of entity, such as service
name, into the metrics storage
entities.|SW_CORE_ACTIVE_EXTRA_MODEL_COLUMNS|false|
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml
b/oap-server/server-bootstrap/src/main/resources/application.yml
index e6db202..2f888fd 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -85,6 +85,8 @@ core:
dataKeeperExecutePeriod: ${SW_CORE_DATA_KEEPER_EXECUTE_PERIOD:5} # How
often the data keeper executor runs periodically, unit is minute
recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:3} # Unit is day
metricsDataTTL: ${SW_CORE_METRICS_DATA_TTL:7} # Unit is day
+ # The period of L1 aggregation flush to L2 aggregation. Unit is ms.
+ l1FlushPeriod: ${SW_CORE_L1_AGGREGATION_FLUSH_PERIOD:500}
# Cache metrics data for 1 minute to reduce database queries, and if the
OAP cluster changes within that minute,
# the metrics may not be accurate within that minute.
enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 8ffcc56..af5f735 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -46,8 +46,15 @@ public class CoreModuleConfig extends ModuleConfig {
private String gRPCSslTrustedCAPath;
private int maxConcurrentCallsPerConnection;
private int maxMessageSize;
- private boolean enableDatabaseSession;
private int topNReportPeriod;
+ /**
+ * The period of L1 aggregation flush. Unit is ms.
+ */
+ private long l1FlushPeriod = 500;
+ /**
+ * Enable database flush session.
+ */
+ private boolean enableDatabaseSession;
private final List<String> downsampling;
/**
* The period of doing data persistence. Unit is second.
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 9e99bd1..a8bbcbe 100755
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -46,9 +46,9 @@ import
org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
import
org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.config.NamingControl;
-import
org.apache.skywalking.oap.server.core.config.group.openapi.EndpointGroupingRuleReader4Openapi;
import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
import
org.apache.skywalking.oap.server.core.config.group.EndpointNameGroupingRuleWatcher;
+import
org.apache.skywalking.oap.server.core.config.group.openapi.EndpointGroupingRuleReader4Openapi;
import
org.apache.skywalking.oap.server.core.management.ui.template.UITemplateInitializer;
import
org.apache.skywalking.oap.server.core.management.ui.template.UITemplateManagementService;
import org.apache.skywalking.oap.server.core.oal.rt.DisableOALDefine;
@@ -289,6 +289,7 @@ public class CoreModuleProvider extends ModuleProvider {
UITemplateManagementService.class, new
UITemplateManagementService(getManager()));
MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
+
MetricsStreamProcessor.getInstance().setL1FlushPeriod(moduleConfig.getL1FlushPeriod());
TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
apdexThresholdConfig = new ApdexThresholdConfig(this);
ApdexMetrics.setDICT(apdexThresholdConfig);
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index b6fd9b3..4e7c59e 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -42,13 +42,15 @@ import
org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
*/
@Slf4j
public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
+ public final long l1FlushPeriod;
private AbstractWorker<Metrics> nextWorker;
private final DataCarrier<Metrics> dataCarrier;
private final MergableBufferedData<Metrics> mergeDataCache;
private CounterMetrics aggregationCounter;
+ private long lastSendTime = 0;
MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder,
AbstractWorker<Metrics> nextWorker,
- String modelName) {
+ String modelName, long l1FlushPeriod) {
super(moduleDefineHolder);
this.nextWorker = nextWorker;
this.mergeDataCache = new MergableBufferedData();
@@ -69,8 +71,10 @@ public class MetricsAggregateWorker extends
AbstractWorker<Metrics> {
.getService(MetricsCreator.class);
aggregationCounter = metricsCreator.createCounter(
"metrics_aggregation", "The number of rows in aggregation",
- new MetricsTag.Keys("metricName", "level", "dimensionality"), new
MetricsTag.Values(modelName, "1", "minute")
+ new MetricsTag.Keys("metricName", "level", "dimensionality"),
+ new MetricsTag.Values(modelName, "1", "minute")
);
+ this.l1FlushPeriod = l1FlushPeriod;
}
/**
@@ -93,14 +97,22 @@ public class MetricsAggregateWorker extends
AbstractWorker<Metrics> {
mergeDataCache.accept(metrics);
});
- mergeDataCache.read().forEach(
- data -> {
- if (log.isDebugEnabled()) {
- log.debug(data.toString());
+ flush();
+ }
+
+ private void flush() {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - lastSendTime > l1FlushPeriod) {
+ mergeDataCache.read().forEach(
+ data -> {
+ if (log.isDebugEnabled()) {
+ log.debug(data.toString());
+ }
+ nextWorker.in(data);
}
- nextWorker.in(data);
- }
- );
+ );
+ lastSendTime = currentTime;
+ }
}
private class AggregatorConsumer implements IConsumer<Metrics> {
@@ -121,5 +133,10 @@ public class MetricsAggregateWorker extends
AbstractWorker<Metrics> {
@Override
public void onExit() {
}
+
+ @Override
+ public void nothingToConsume() {
+ flush();
+ }
}
-}
+}
\ No newline at end of file
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 9d8faf7..3a40777 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -71,6 +71,12 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
private List<MetricsPersistentWorker> persistentWorkers = new
ArrayList<>();
/**
+ * The period of L1 aggregation flush. Unit is ms.
+ */
+ @Setter
+ @Getter
+ private long l1FlushPeriod = 500;
+ /**
* Hold and forward CoreModuleConfig#enableDatabaseSession to the
persistent worker.
*/
@Setter
@@ -97,7 +103,9 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
* @param metricsClass data type of the streaming calculation.
*/
@Override
- public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
Class<? extends Metrics> metricsClass) throws StorageException {
+ public void create(ModuleDefineHolder moduleDefineHolder,
+ Stream stream,
+ Class<? extends Metrics> metricsClass) throws
StorageException {
this.create(moduleDefineHolder, StreamDefinition.from(stream),
metricsClass);
}
@@ -108,7 +116,8 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
final StorageBuilderFactory storageBuilderFactory =
moduleDefineHolder.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class);
- final Class<? extends StorageBuilder> builder =
storageBuilderFactory.builderOf(metricsClass, stream.getBuilder());
+ final Class<? extends StorageBuilder> builder =
storageBuilderFactory.builderOf(
+ metricsClass, stream.getBuilder());
StorageDAO storageDAO =
moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IMetricsDAO metricsDAO;
@@ -167,7 +176,7 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
MetricsRemoteWorker remoteWorker = new
MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
- moduleDefineHolder, remoteWorker, stream.getName());
+ moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod);
entryWorkers.put(metricsClass, aggregateWorker);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index 59a89b9..51fda20 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
@@ -37,17 +38,13 @@ import
org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This is a wrapper of the gRPC client for sending message to each other OAP
server. It contains a block queue to
* buffering the message and sending the message by batch.
*/
+@Slf4j
public class GRPCRemoteClient implements RemoteClient {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(GRPCRemoteClient.class);
-
private final int channelSize;
private final int bufferSize;
private final Address address;
@@ -76,13 +73,23 @@ public class GRPCRemoteClient implements RemoteClient {
remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
-
.createCounter("remote_out_count", "The number(client side) of inside remote
inside aggregate rpc.", new MetricsTag.Keys("dest", "self"), new
MetricsTag.Values(address
-
.toString(), "N"));
+ .createCounter(
+ "remote_out_count",
+ "The number(client side) of
inside remote inside aggregate rpc.",
+ new MetricsTag.Keys("dest",
"self"), new MetricsTag.Values(
+ address
+ .toString(), "N")
+ );
remoteOutErrorCounter = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
-
.createCounter("remote_out_error_count", "The error number(client side) of
inside remote inside aggregate rpc.", new MetricsTag.Keys("dest", "self"), new
MetricsTag.Values(address
-
.toString(),
"N"));
+ .createCounter(
+ "remote_out_error_count",
+ "The error number(client
side) of inside remote inside aggregate rpc.",
+ new
MetricsTag.Keys("dest", "self"), new MetricsTag.Values(
+ address
+ .toString(), "N")
+ );
}
@Override
@@ -160,13 +167,13 @@ public class GRPCRemoteClient implements RemoteClient {
streamObserver.onCompleted();
} catch (Throwable t) {
remoteOutErrorCounter.inc();
- LOGGER.error(t.getMessage(), t);
+ log.error(t.getMessage(), t);
}
}
@Override
public void onError(List<RemoteMessage> remoteMessages, Throwable t) {
- LOGGER.error(t.getMessage(), t);
+ log.error(t.getMessage(), t);
}
@Override
@@ -183,38 +190,47 @@ public class GRPCRemoteClient implements RemoteClient {
private StreamObserver<RemoteMessage> createStreamObserver() {
int sleepTotalMillis = 0;
int sleepMillis = 10;
- while (concurrentStreamObserverNumber.incrementAndGet() > 10) {
- concurrentStreamObserverNumber.addAndGet(-1);
+ // Control the concurrency of gRPC streaming stub.
+ // If over 10 created and not finished/error, this blocks the method.
+ while (concurrentStreamObserverNumber.get() > 10) {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
- LOGGER.error(e.getMessage(), e);
+ log.error(e.getMessage(), e);
}
sleepTotalMillis += sleepMillis;
if (sleepTotalMillis > 60000) {
- LOGGER.warn("Remote client block times over 60 seconds.");
+ log.warn("Remote client [{}] block times over 60 seconds.
Current streaming number {}",
+ address, concurrentStreamObserverNumber.get()
+ );
+ // Reset sleepTotalMillis to avoid too many warn logs.
+ sleepTotalMillis = 0;
}
}
- return getStub().withDeadlineAfter(remoteTimeout,
TimeUnit.SECONDS).call(new StreamObserver<Empty>() {
- @Override
- public void onNext(Empty empty) {
- }
-
- @Override
- public void onError(Throwable throwable) {
- concurrentStreamObserverNumber.addAndGet(-1);
- LOGGER.error(throwable.getMessage(), throwable);
- }
-
- @Override
- public void onCompleted() {
- concurrentStreamObserverNumber.addAndGet(-1);
- }
- });
+ final StreamObserver<RemoteMessage> remoteMessageStreamObserver
+ = getStub().withDeadlineAfter(remoteTimeout, TimeUnit.SECONDS)
+ .call(new StreamObserver<Empty>() {
+ @Override
+ public void onNext(Empty empty) {
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ concurrentStreamObserverNumber.addAndGet(-1);
+ log.error(throwable.getMessage(), throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ concurrentStreamObserverNumber.addAndGet(-1);
+ }
+ });
+ concurrentStreamObserverNumber.incrementAndGet();
+ return remoteMessageStreamObserver;
}
@Override