This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch l1-flush in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 72bfa0132185d8b787f71541cf1ee4c8b53bae03 Author: Wu Sheng <[email protected]> AuthorDate: Wed Jun 30 11:56:29 2021 +0800 Add the period of L1 aggregation flush to L2 aggregation --- CHANGES.md | 3 +- docs/en/setup/backend/configuration-vocabulary.md | 1 + .../src/main/resources/application.yml | 2 ++ .../oap/server/core/CoreModuleConfig.java | 9 +++++- .../oap/server/core/CoreModuleProvider.java | 3 +- .../analysis/worker/MetricsAggregateWorker.java | 32 +++++++++++++++------- .../analysis/worker/MetricsStreamProcessor.java | 15 ++++++++-- 7 files changed, 49 insertions(+), 16 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 8ae7936..eb37dc3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -56,7 +56,8 @@ Release Notes. * Performance: cache regex pattern and result, optimize string concatenation in Envy ALS analyzer. * Performance: cache metrics id and entity id in `Metrics` and `ISource`. * Performance: enhance persistent session mechanism, about differentiating cache timeout for different dimensionality - metrics. The timeout of the cache for minute and hour level metrics has been prolonged to ~5 min. + metrics. The timeout of the cache for minute and hour level metrics has been prolonged to ~5 min. +* Performance: Add L1 aggregation flush period, which reduce the CPU load and help young GC. #### UI diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md index c435c8f..3fa9376 100644 --- a/docs/en/setup/backend/configuration-vocabulary.md +++ b/docs/en/setup/backend/configuration-vocabulary.md @@ -23,6 +23,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | dataKeeperExecutePeriod|The execution period of TTL scheduler, unit is minute. Execution doesn't mean deleting data. The storage provider could override this, such as ElasticSearch storage.|SW_CORE_DATA_KEEPER_EXECUTE_PERIOD|5| | - | - | recordDataTTL|The lifecycle of record data. Record data includes traces, top n sampled records, and logs. Unit is day. Minimal value is 2.|SW_CORE_RECORD_DATA_TTL|3| | - | - | metricsDataTTL|The lifecycle of metrics data, including the metadata. Unit is day. Recommend metricsDataTTL >= recordDataTTL. Minimal value is 2.| SW_CORE_METRICS_DATA_TTL|7| +| - | - | l1FlushPeriod| The period of L1 aggregation flush to L2 aggregation. Unit is ms. | SW_CORE_L1_AGGREGATION_FLUSH_PERIOD | 500 | | - | - | enableDatabaseSession|Cache metrics data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute.|SW_CORE_ENABLE_DATABASE_SESSION|true| | - | - | topNReportPeriod|The execution period of top N sampler, which saves sampled data into the storage. Unit is minute|SW_CORE_TOPN_REPORT_PERIOD|10| | - | - | activeExtraModelColumns|Append the names of entity, such as service name, into the metrics storage entities.|SW_CORE_ACTIVE_EXTRA_MODEL_COLUMNS|false| diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml index e6db202..2f888fd 100755 --- a/oap-server/server-bootstrap/src/main/resources/application.yml +++ b/oap-server/server-bootstrap/src/main/resources/application.yml @@ -85,6 +85,8 @@ core: dataKeeperExecutePeriod: ${SW_CORE_DATA_KEEPER_EXECUTE_PERIOD:5} # How often the data keeper executor runs periodically, unit is minute recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:3} # Unit is day metricsDataTTL: ${SW_CORE_METRICS_DATA_TTL:7} # Unit is day + # The period of L1 aggregation flush to L2 aggregation. Unit is ms. + l1FlushPeriod: ${SW_CORE_L1_AGGREGATION_FLUSH_PERIOD:500} # Cache metrics data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute, # the metrics may not be accurate within that minute. enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java index 8ffcc56..af5f735 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java @@ -46,8 +46,15 @@ public class CoreModuleConfig extends ModuleConfig { private String gRPCSslTrustedCAPath; private int maxConcurrentCallsPerConnection; private int maxMessageSize; - private boolean enableDatabaseSession; private int topNReportPeriod; + /** + * The period of L1 aggregation flush. Unit is ms. + */ + private long l1FlushPeriod = 500; + /** + * Enable database flush session. + */ + private boolean enableDatabaseSession; private final List<String> downsampling; /** * The period of doing data persistence. Unit is second. diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index 9e99bd1..a8bbcbe 100755 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -46,9 +46,9 @@ import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService; import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService; import org.apache.skywalking.oap.server.core.config.NamingControl; -import org.apache.skywalking.oap.server.core.config.group.openapi.EndpointGroupingRuleReader4Openapi; import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping; import org.apache.skywalking.oap.server.core.config.group.EndpointNameGroupingRuleWatcher; +import org.apache.skywalking.oap.server.core.config.group.openapi.EndpointGroupingRuleReader4Openapi; import org.apache.skywalking.oap.server.core.management.ui.template.UITemplateInitializer; import org.apache.skywalking.oap.server.core.management.ui.template.UITemplateManagementService; import org.apache.skywalking.oap.server.core.oal.rt.DisableOALDefine; @@ -289,6 +289,7 @@ public class CoreModuleProvider extends ModuleProvider { UITemplateManagementService.class, new UITemplateManagementService(getManager())); MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession()); + MetricsStreamProcessor.getInstance().setL1FlushPeriod(moduleConfig.getL1FlushPeriod()); TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod()); apdexThresholdConfig = new ApdexThresholdConfig(this); ApdexMetrics.setDICT(apdexThresholdConfig); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java index b6fd9b3..684c5fc 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java @@ -42,13 +42,15 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; */ @Slf4j public class MetricsAggregateWorker extends AbstractWorker<Metrics> { + public final long l1FlushPeriod; private AbstractWorker<Metrics> nextWorker; private final DataCarrier<Metrics> dataCarrier; private final MergableBufferedData<Metrics> mergeDataCache; private CounterMetrics aggregationCounter; + private long lastSendTime = 0; MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker, - String modelName) { + String modelName, long l1FlushPeriod) { super(moduleDefineHolder); this.nextWorker = nextWorker; this.mergeDataCache = new MergableBufferedData(); @@ -69,8 +71,10 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> { .getService(MetricsCreator.class); aggregationCounter = metricsCreator.createCounter( "metrics_aggregation", "The number of rows in aggregation", - new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "1", "minute") + new MetricsTag.Keys("metricName", "level", "dimensionality"), + new MetricsTag.Values(modelName, "1", "minute") ); + this.l1FlushPeriod = l1FlushPeriod; } /** @@ -93,14 +97,22 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> { mergeDataCache.accept(metrics); }); - mergeDataCache.read().forEach( - data -> { - if (log.isDebugEnabled()) { - log.debug(data.toString()); + flush(); + } + + private void flush() { + long currentTime = System.currentTimeMillis(); + if (currentTime - lastSendTime > l1FlushPeriod) { + mergeDataCache.read().forEach( + data -> { + if (log.isDebugEnabled()) { + log.debug(data.toString()); + } + nextWorker.in(data); } - nextWorker.in(data); - } - ); + ); + lastSendTime = currentTime; + } } private class AggregatorConsumer implements IConsumer<Metrics> { @@ -122,4 +134,4 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> { public void onExit() { } } -} +} \ No newline at end of file diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java index 9d8faf7..3a40777 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java @@ -71,6 +71,12 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { private List<MetricsPersistentWorker> persistentWorkers = new ArrayList<>(); /** + * The period of L1 aggregation flush. Unit is ms. + */ + @Setter + @Getter + private long l1FlushPeriod = 500; + /** * Hold and forward CoreModuleConfig#enableDatabaseSession to the persistent worker. */ @Setter @@ -97,7 +103,9 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { * @param metricsClass data type of the streaming calculation. */ @Override - public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) throws StorageException { + public void create(ModuleDefineHolder moduleDefineHolder, + Stream stream, + Class<? extends Metrics> metricsClass) throws StorageException { this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass); } @@ -108,7 +116,8 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME) .provider() .getService(StorageBuilderFactory.class); - final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(metricsClass, stream.getBuilder()); + final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf( + metricsClass, stream.getBuilder()); StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); IMetricsDAO metricsDAO; @@ -167,7 +176,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName); MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker( - moduleDefineHolder, remoteWorker, stream.getName()); + moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod); entryWorkers.put(metricsClass, aggregateWorker); }
