This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 1c0fc98 Support configure sampling rate dynamically for service
dimension on the backend side (#7554)
1c0fc98 is described below
commit 1c0fc982b2dc61ca7e452142df7f9846e867d71d
Author: HendSame <[email protected]>
AuthorDate: Mon Aug 30 14:52:20 2021 +0800
Support configure sampling rate dynamically for service dimension on the
backend side (#7554)
---
CHANGES.md | 1 +
apm-dist-es7/src/main/assembly/binary-es7.xml | 1 +
apm-dist/src/main/assembly/binary.xml | 1 +
docs/en/setup/backend/configuration-vocabulary.md | 3 +-
docs/en/setup/backend/dynamic-config.md | 3 +-
docs/en/setup/backend/trace-sampling.md | 28 +-
.../analyzer/provider/AnalyzerModuleConfig.java | 18 +-
.../analyzer/provider/AnalyzerModuleProvider.java | 17 +-
.../trace/TraceLatencyThresholdsAndWatcher.java | 78 -----
.../provider/trace/TraceSampleRateWatcher.java | 71 ----
.../provider/trace/TraceSamplingPolicyWatcher.java | 177 ++++++++++
.../parser/listener/SegmentAnalysisListener.java | 13 +-
.../trace/parser/listener/TraceSegmentSampler.java | 16 +-
.../SamplingPolicy.java} | 33 +-
.../trace/sampling/SamplingPolicySettings.java | 50 +++
.../sampling/SamplingPolicySettingsReader.java | 89 +++++
.../TraceLatencyThresholdsAndWatcherTest.java | 119 -------
.../provider/trace/TraceSampleRateWatcherTest.java | 120 -------
.../trace/TraceSamplingPolicyWatcherTest.java | 370 +++++++++++++++++++++
.../sampling/SamplingPolicySettingsReaderTest.java | 42 +++
.../resources/trace-sampling-policy-settings.yml | 25 ++
oap-server/server-bootstrap/pom.xml | 1 +
.../src/main/resources/application.yml | 4 +-
.../resources/trace-sampling-policy-settings.yml | 26 ++
24 files changed, 847 insertions(+), 459 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 8ebee84..c441e9e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -40,6 +40,7 @@ Release Notes.
* Fix NPE when OAP nodes synchronize events with each other in cluster mode.
* Support k8s configmap grouped dynamic configurations.
* Add desc sort function in H2 and ElasticSearch implementations of
IBrowserLogQueryDAO
+* Support configure sampling policy by `configuration module` dynamically and
static configuration file `trace-sampling-policy-settings.yml` for service
dimension on the backend side. Dynamic configurations
`agent-analyzer.default.sampleRate` and
`agent-analyzer.default.slowTraceSegmentThreshold` are replaced by
`agent-analyzer.default.traceSamplingPolicy`. Static configurations
`agent-analyzer.default.sampleRate` and
`agent-analyzer.default.slowTraceSegmentThreshold` are replaced by `ag [...]
#### UI
diff --git a/apm-dist-es7/src/main/assembly/binary-es7.xml
b/apm-dist-es7/src/main/assembly/binary-es7.xml
index 4f9353d..d19cb1d 100644
--- a/apm-dist-es7/src/main/assembly/binary-es7.xml
+++ b/apm-dist-es7/src/main/assembly/binary-es7.xml
@@ -58,6 +58,7 @@
<include>service-apdex-threshold.yml</include>
<include>endpoint-name-grouping.yml</include>
<include>metadata-service-mapping.yaml</include>
+ <include>trace-sampling-policy-settings.yml</include>
<include>oal/*.oal</include>
<include>fetcher-prom-rules/*.yaml</include>
<include>envoy-metrics-rules/*.yaml</include>
diff --git a/apm-dist/src/main/assembly/binary.xml
b/apm-dist/src/main/assembly/binary.xml
index 731791b..bc047ab 100644
--- a/apm-dist/src/main/assembly/binary.xml
+++ b/apm-dist/src/main/assembly/binary.xml
@@ -58,6 +58,7 @@
<include>service-apdex-threshold.yml</include>
<include>endpoint-name-grouping.yml</include>
<include>metadata-service-mapping.yaml</include>
+ <include>trace-sampling-policy-settings.yml</include>
<include>oal/*.oal</include>
<include>fetcher-prom-rules/*.yaml</include>
<include>envoy-metrics-rules/*.yaml</include>
diff --git a/docs/en/setup/backend/configuration-vocabulary.md
b/docs/en/setup/backend/configuration-vocabulary.md
index f6db25e..3d50453 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -160,12 +160,11 @@ core|default|role|Option values:
`Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | fetchTaskLogMaxSize | The maximum number of fetch task log in a
request. | SW_STORAGE_INFLUXDB_FETCH_TASK_LOG_MAX_SIZE | 5000|
| - | - | connectionResponseFormat | The response format of connection to
influxDB. It can only be MSGPACK or JSON. |
SW_STORAGE_INFLUXDB_CONNECTION_RESPONSE_FORMAT | MSGPACK |
| agent-analyzer | default | Agent Analyzer. | SW_AGENT_ANALYZER | default |
-| - | -| sampleRate| Sampling rate for receiving trace. Precise to 1/10000.
10000 means a sampling rate of 100% by default.|SW_TRACE_SAMPLE_RATE|10000|
+| - | - | traceSamplingPolicySettingsFile | The sampling policy including
`sampling rate` and `the threshold of trace segment latency` can be configured
by the `traceSamplingPolicySettingsFile` file. |
SW_TRACE_SAMPLING_POLICY_SETTINGS_FILE | `trace-sampling-policy-settings.yml` |
| - | - |slowDBAccessThreshold| The slow database access threshold (in
milliseconds). |SW_SLOW_DB_THRESHOLD|default:200,mongodb:100|
| - | - |forceSampleErrorSegment| When sampling mechanism is activated, this
config samples the error status segment and ignores the sampling rate.
|SW_FORCE_SAMPLE_ERROR_SEGMENT|true|
| - | - |segmentStatusAnalysisStrategy| Determines the final segment status
from span status. Available values are `FROM_SPAN_STATUS` , `FROM_ENTRY_SPAN`,
and `FROM_FIRST_SPAN`. `FROM_SPAN_STATUS` indicates that the segment status
would be error if any span has an error status. `FROM_ENTRY_SPAN` means that
the segment status would only be determined by the status of entry spans.
`FROM_FIRST_SPAN` means that the segment status would only be determined by the
status of the first span. |SW_ [...]
| - | - |noUpstreamRealAddressAgents| Exit spans with the component in the
list would not generate client-side instance relation metrics, since some
tracing plugins (e.g. Nginx-LUA and Envoy) can't collect the real peer IP
address. |SW_NO_UPSTREAM_REAL_ADDRESS|6000,9000|
-| - | - |slowTraceSegmentThreshold| Setting this threshold on latency (in
milliseconds) would cause the slow trace segments to be sampled if they use up
more time, even if the sampling mechanism is activated. The default value is
`-1`, which means that slow traces would not be sampled.
|SW_SLOW_TRACE_SEGMENT_THRESHOLD|-1|
| - | - |meterAnalyzerActiveFiles| Indicates which files could be instrumented
and analyzed. Multiple files are split by ",". |SW_METER_ANALYZER_ACTIVE_FILES||
| receiver-sharing-server|default| Sharing server provides new gRPC and
restful servers for data collection. Ana designates that servers in the core
module are to be used for internal communication only. | - | - |
| - | - | restHost| Binding IP of RESTful services. Services include GraphQL
query and HTTP data report. | SW_RECEIVER_SHARING_REST_HOST | - |
diff --git a/docs/en/setup/backend/dynamic-config.md
b/docs/en/setup/backend/dynamic-config.md
index b17f0f8..6cc46fa 100755
--- a/docs/en/setup/backend/dynamic-config.md
+++ b/docs/en/setup/backend/dynamic-config.md
@@ -36,8 +36,7 @@ Supported configurations are as follows:
|core.default.apdexThreshold| The apdex threshold settings. Overrides
`service-apdex-threshold.yml`. | Same as
[`service-apdex-threshold.yml`](apdex-threshold.md). |
|core.default.endpoint-name-grouping| The endpoint name grouping setting.
Overrides `endpoint-name-grouping.yml`. | Same as
[`endpoint-name-grouping.yml`](endpoint-grouping-rules.md). |
|core.default.log4j-xml| The log4j xml configuration. Overrides `log4j2.xml`.
| Same as [`log4j2.xml`](dynamical-logging.md). |
-|agent-analyzer.default.sampleRate| Trace sampling. Overrides
`receiver-trace/default/sampleRate` of `application.yml`. | 10000 |
-|agent-analyzer.default.slowTraceSegmentThreshold| Setting this threshold on
latency (in milliseconds) would cause slow trace segments to be sampled if they
use up more time, even if the sampling mechanism is activated. The default
value is `-1`, which means slow traces will not be sampled. Overrides
`receiver-trace/default/slowTraceSegmentThreshold` of `application.yml`. | -1 |
+|agent-analyzer.default.traceSamplingPolicy| The sampling policy for default
and service dimension, override `trace-sampling-policy-settings.yml`. | same as
[`trace-sampling-policy-settings.yml`](trace-sampling.md) |
|configuration-discovery.default.agentConfigurations| The
ConfigurationDiscovery settings. | See
[`configuration-discovery.md`](https://github.com/apache/skywalking-java/blob/20fb8c81b3da76ba6628d34c12d23d3d45c973ef/docs/en/setup/service-agent/java-agent/configuration-discovery.md).
|
## Group Configuration
diff --git a/docs/en/setup/backend/trace-sampling.md
b/docs/en/setup/backend/trace-sampling.md
index 965e10d..0217225 100644
--- a/docs/en/setup/backend/trace-sampling.md
+++ b/docs/en/setup/backend/trace-sampling.md
@@ -7,26 +7,44 @@ segments have been collected and reported by agents, the
backend would do their
to understand why you should keep the traces as consistent as possible and try
not to split them.
## Set the sample rate
-In the **agent-analyzer** module, you will find the `sampleRate` setting.
+In the **agent-analyzer** module, you will find the `sampleRate` setting by
the configuration `traceSamplingPolicySettingsFile`.
```yaml
agent-analyzer:
default:
...
- sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is
1/10000. 10000 means 100% sample in default.
+ # The default sampling rate and the default trace latency time configured
by the 'traceSamplingPolicySettingsFile' file.
+ traceSamplingPolicySettingsFile:
${SW_TRACE_SAMPLING_POLICY_SETTINGS_FILE:trace-sampling-policy-settings.yml}
forceSampleErrorSegment: ${SW_FORCE_SAMPLE_ERROR_SEGMENT:true} # When
sampling mechanism activated, this config would make the error status segment
sampled, ignoring the sampling rate.
- slowTraceSegmentThreshold: ${SW_SLOW_TRACE_SEGMENT_THRESHOLD:-1} # Setting
this threshold about the latency would make the slow trace segments sampled if
they cost more time, even the sampling mechanism activated. The default value
is `-1`, which means would not sample slow traces. Unit, millisecond.
```
-`sampleRate` allows you to set the sample rate to this backend.
+The default `trace-sampling-policy-settings.yml` uses the following format.
Could use [dynamic configuration](dynamic-config.md) to update the settings in
the runtime.
+```yaml
+default:
+ # Default sampling rate that replaces the 'agent-analyzer.default.sampleRate'
+ # The sample rate precision is 1/10000. 10000 means 100% sample in default.
+ rate: 10000
+ # Default trace latency time that replaces the
'agent-analyzer.default.slowTraceSegmentThreshold'
+ # Setting this threshold about the latency would make the slow trace
segments sampled if they cost more time, even the sampling mechanism activated.
The default value is `-1`, which means would not sample slow traces. Unit,
millisecond.
+ duration: -1
+#services:
+# - name: serverName
+# rate: 1000 # Sampling rate of this specific service
+# duration: 10000 # Trace latency threshold for trace sampling for this
specific service
+```
+
+`duration.rate` allows you to set the sample rate to this backend.
The sample rate precision is 1/10000. 10000 means 100% sample by default.
`forceSampleErrorSegment` allows you to save all error segments when sampling
mechanism is activated.
When sampling mechanism is activated, this config would cause the error status
segment to be sampled, ignoring the sampling rate.
-`slowTraceSegmentThreshold` allows you to save all slow trace segments when
sampling mechanism is activated.
+`default.duration` allows you to save all slow trace segments when sampling
mechanism is activated.
Setting this threshold on latency (in milliseconds) would cause slow trace
segments to be sampled if they use up more time, even if the sampling mechanism
is activated. The default value is `-1`, which means that slow traces would not
be sampled.
+**Note:**
+`services.[].rate` and `services.[].duration` has a higher priority than
`default.rare` and `default.duration`.
+
# Recommendation
You may choose to set different backend instances with different `sampleRate`
values, although we recommend that you set the values to be the same.
diff --git
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/AnalyzerModuleConfig.java
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/AnalyzerModuleConfig.java
index 2d88c64..ed9c5ad 100644
---
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/AnalyzerModuleConfig.java
+++
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/AnalyzerModuleConfig.java
@@ -24,8 +24,7 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import
org.apache.skywalking.oap.server.analyzer.provider.trace.DBLatencyThresholdsAndWatcher;
-import
org.apache.skywalking.oap.server.analyzer.provider.trace.TraceLatencyThresholdsAndWatcher;
-import
org.apache.skywalking.oap.server.analyzer.provider.trace.TraceSampleRateWatcher;
+import
org.apache.skywalking.oap.server.analyzer.provider.trace.TraceSamplingPolicyWatcher;
import
org.apache.skywalking.oap.server.analyzer.provider.trace.UninstrumentedGatewaysConfig;
import
org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.strategy.SegmentStatusStrategy;
import org.apache.skywalking.oap.server.core.Const;
@@ -39,11 +38,11 @@ import static
org.apache.skywalking.oap.server.analyzer.provider.trace.parser.li
@Slf4j
public class AnalyzerModuleConfig extends ModuleConfig {
/**
- * The sample rate precision is 1/10000. 10000 means 100% sample in
default.
+ * The sample policy setting file
*/
@Setter
@Getter
- private int sampleRate = 10000;
+ private String traceSamplingPolicySettingsFile;
/**
* Some of the agent can not have the upstream real network address, such
as https://github.com/apache/skywalking-nginx-lua.
* service instance mapping and service instance client side relation are
ignored.
@@ -58,12 +57,6 @@ public class AnalyzerModuleConfig extends ModuleConfig {
@Setter
@Getter
private String slowDBAccessThreshold = "default:200";
- /**
- * Setting this threshold about the latency would make the slow trace
segments sampled if they cost more time, even the sampling mechanism activated.
The default value is `-1`, which means would not sample slow traces. Unit,
millisecond.
- */
- @Setter
- @Getter
- private int slowTraceSegmentThreshold = -1;
@Setter
@Getter
private DBLatencyThresholdsAndWatcher dbLatencyThresholdsAndWatcher;
@@ -72,10 +65,7 @@ public class AnalyzerModuleConfig extends ModuleConfig {
private UninstrumentedGatewaysConfig uninstrumentedGatewaysConfig;
@Setter
@Getter
- private TraceSampleRateWatcher traceSampleRateWatcher;
- @Setter
- @Getter
- private TraceLatencyThresholdsAndWatcher traceLatencyThresholdsAndWatcher;
+ private TraceSamplingPolicyWatcher traceSamplingPolicyWatcher;
/**
* Analysis trace status.
* <p>
diff --git
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/AnalyzerModuleProvider.java
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/AnalyzerModuleProvider.java
index f05e93a..4fe93d5 100644
---
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/AnalyzerModuleProvider.java
+++
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/AnalyzerModuleProvider.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.analyzer.provider;
import java.util.List;
+
import lombok.Getter;
import org.apache.skywalking.oap.server.analyzer.module.AnalyzerModule;
import
org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfig;
@@ -26,8 +27,7 @@ import
org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConf
import
org.apache.skywalking.oap.server.analyzer.provider.meter.process.IMeterProcessService;
import
org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessService;
import
org.apache.skywalking.oap.server.analyzer.provider.trace.DBLatencyThresholdsAndWatcher;
-import
org.apache.skywalking.oap.server.analyzer.provider.trace.TraceLatencyThresholdsAndWatcher;
-import
org.apache.skywalking.oap.server.analyzer.provider.trace.TraceSampleRateWatcher;
+import
org.apache.skywalking.oap.server.analyzer.provider.trace.TraceSamplingPolicyWatcher;
import
org.apache.skywalking.oap.server.analyzer.provider.trace.UninstrumentedGatewaysConfig;
import
org.apache.skywalking.oap.server.analyzer.provider.trace.parser.ISegmentParserService;
import
org.apache.skywalking.oap.server.analyzer.provider.trace.parser.SegmentParserListenerManager;
@@ -57,9 +57,7 @@ public class AnalyzerModuleProvider extends ModuleProvider {
@Getter
private SegmentParserServiceImpl segmentParserService;
@Getter
- private TraceSampleRateWatcher traceSampleRateWatcher;
- @Getter
- private TraceLatencyThresholdsAndWatcher traceLatencyThresholdsAndWatcher;
+ private TraceSamplingPolicyWatcher traceSamplingPolicyWatcher;
private List<MeterConfig> meterConfigs;
@Getter
@@ -90,13 +88,11 @@ public class AnalyzerModuleProvider extends ModuleProvider {
uninstrumentedGatewaysConfig = new UninstrumentedGatewaysConfig(this);
- traceSampleRateWatcher = new TraceSampleRateWatcher(this);
- traceLatencyThresholdsAndWatcher = new
TraceLatencyThresholdsAndWatcher(this);
+ traceSamplingPolicyWatcher = new
TraceSamplingPolicyWatcher(moduleConfig, this);
moduleConfig.setDbLatencyThresholdsAndWatcher(thresholds);
moduleConfig.setUninstrumentedGatewaysConfig(uninstrumentedGatewaysConfig);
- moduleConfig.setTraceSampleRateWatcher(traceSampleRateWatcher);
-
moduleConfig.setTraceLatencyThresholdsAndWatcher(traceLatencyThresholdsAndWatcher);
+ moduleConfig.setTraceSamplingPolicyWatcher(traceSamplingPolicyWatcher);
segmentParserService = new SegmentParserServiceImpl(getManager(),
moduleConfig);
this.registerServiceImplementation(ISegmentParserService.class,
segmentParserService);
@@ -121,8 +117,7 @@ public class AnalyzerModuleProvider extends ModuleProvider {
DynamicConfigurationService.class);
dynamicConfigurationService.registerConfigChangeWatcher(thresholds);
dynamicConfigurationService.registerConfigChangeWatcher(uninstrumentedGatewaysConfig);
-
dynamicConfigurationService.registerConfigChangeWatcher(traceSampleRateWatcher);
-
dynamicConfigurationService.registerConfigChangeWatcher(traceLatencyThresholdsAndWatcher);
+
dynamicConfigurationService.registerConfigChangeWatcher(traceSamplingPolicyWatcher);
segmentParserService.setListenerManager(listenerManager());
diff --git
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceLatencyThresholdsAndWatcher.java
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceLatencyThresholdsAndWatcher.java
deleted file mode 100644
index 90e635d..0000000
---
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceLatencyThresholdsAndWatcher.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.analyzer.provider.trace;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.analyzer.module.AnalyzerModule;
-import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
-import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-
-/**
- * This threshold watcher about the latency would make the slow trace segments
sampled if they cost more time,
- * even the sampling mechanism activated. The default value is `-1`, which
means would not sample slow traces. Unit, millisecond.
- */
-@Slf4j
-public class TraceLatencyThresholdsAndWatcher extends ConfigChangeWatcher {
- private AtomicInteger slowTraceSegmentThreshold;
-
- public TraceLatencyThresholdsAndWatcher(ModuleProvider provider) {
- super(AnalyzerModule.NAME, provider, "slowTraceSegmentThreshold");
- slowTraceSegmentThreshold = new AtomicInteger();
- slowTraceSegmentThreshold.set(getDefaultValue());
- }
-
- private void activeSetting(String config) {
- if (log.isDebugEnabled()) {
- log.debug("Updating using new static config: {}", config);
- }
- try {
- slowTraceSegmentThreshold.set(Integer.parseInt(config));
- } catch (NumberFormatException ex) {
- log.error("Cannot load slowTraceThreshold from: {}", config, ex);
- }
- }
-
- @Override
- public void notify(ConfigChangeEvent value) {
- if (EventType.DELETE.equals(value.getEventType())) {
- activeSetting(String.valueOf(getDefaultValue()));
- } else {
- activeSetting(value.getNewValue());
- }
- }
-
- @Override
- public String value() {
- return String.valueOf(slowTraceSegmentThreshold.get());
- }
-
- private int getDefaultValue() {
- return ((AnalyzerModuleConfig)
this.getProvider().createConfigBeanIfAbsent()).getSlowTraceSegmentThreshold();
- }
-
- public int getSlowTraceSegmentThreshold() {
- return slowTraceSegmentThreshold.get();
- }
-
- public boolean shouldSample(int duration) {
- return (slowTraceSegmentThreshold.get() > -1) && (duration >=
slowTraceSegmentThreshold.get());
- }
-}
diff --git
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSampleRateWatcher.java
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSampleRateWatcher.java
deleted file mode 100644
index 72c9c9b..0000000
---
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSampleRateWatcher.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.analyzer.provider.trace;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.analyzer.module.AnalyzerModule;
-import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
-import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-@Slf4j
-public class TraceSampleRateWatcher extends ConfigChangeWatcher {
- private AtomicReference<Integer> sampleRate;
-
- public TraceSampleRateWatcher(ModuleProvider provider) {
- super(AnalyzerModule.NAME, provider, "sampleRate");
- sampleRate = new AtomicReference<>();
- sampleRate.set(getDefaultValue());
- }
-
- private void activeSetting(String config) {
- if (log.isDebugEnabled()) {
- log.debug("Updating using new static config: {}", config);
- }
- try {
- sampleRate.set(Integer.parseInt(config));
- } catch (NumberFormatException ex) {
- log.error("Cannot load sampleRate from: {}", config, ex);
- }
- }
-
- @Override
- public void notify(ConfigChangeEvent value) {
- if (EventType.DELETE.equals(value.getEventType())) {
- activeSetting(String.valueOf(getDefaultValue()));
- } else {
- activeSetting(value.getNewValue());
- }
- }
-
- @Override
- public String value() {
- return String.valueOf(sampleRate.get());
- }
-
- private int getDefaultValue() {
- return ((AnalyzerModuleConfig)
this.getProvider().createConfigBeanIfAbsent()).getSampleRate();
- }
-
- public int getSampleRate() {
- return sampleRate.get();
- }
-}
diff --git
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSamplingPolicyWatcher.java
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSamplingPolicyWatcher.java
new file mode 100644
index 0000000..d0eae11
--- /dev/null
+++
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSamplingPolicyWatcher.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.analyzer.provider.trace;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.analyzer.module.AnalyzerModule;
+import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
+import
org.apache.skywalking.oap.server.analyzer.provider.trace.sampling.SamplingPolicy;
+import
org.apache.skywalking.oap.server.analyzer.provider.trace.sampling.SamplingPolicySettings;
+import
org.apache.skywalking.oap.server.analyzer.provider.trace.sampling.SamplingPolicySettingsReader;
+import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.util.ResourceUtils;
+
+import java.io.StringReader;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Objects.isNull;
+
+@Slf4j
+public class TraceSamplingPolicyWatcher extends ConfigChangeWatcher {
+
+ private final AtomicReference<String> settingsString = new
AtomicReference<>(null);
+ private final AtomicReference<SamplingPolicySettings>
samplingPolicySettings = new AtomicReference<>(null);
+ private final SamplingPolicySettings defaultSamplingPolicySettings;
+
+ public TraceSamplingPolicyWatcher(AnalyzerModuleConfig moduleConfig,
ModuleProvider provider) {
+ super(AnalyzerModule.NAME, provider, "traceSamplingPolicy");
+ this.defaultSamplingPolicySettings =
parseFromFile(moduleConfig.getTraceSamplingPolicySettingsFile());
+ loadDefaultPolicySettings();
+ }
+
+ @Override
+ public void notify(ConfigChangeEvent value) {
+ if (EventType.DELETE.equals(value.getEventType()) ||
StringUtil.isBlank(value.getNewValue())) {
+ this.settingsString.set(null);
+ log.info("[trace-sampling-policy] Delete trace-sampling-policy,use
default config");
+ loadDefaultPolicySettings();
+ } else {
+ activeSetting(value.getNewValue());
+ }
+ }
+
+ @Override
+ public String value() {
+ return this.settingsString.get();
+ }
+
+ /**
+ * Determine whether need to be sampled
+ *
+ * @param service service's name
+ * @param sample sample rate of trace segment
+ * @param duration duration of trace segment
+ * @return
+ */
+ public boolean shouldSample(String service, int sample, int duration) {
+ SamplingPolicy samplingPolicy =
this.samplingPolicySettings.get().get(service);
+ if (samplingPolicy == null) {
+ return shouldSampleByDefault(sample, duration);
+ }
+ return shouldSampleService(samplingPolicy, sample, duration);
+ }
+
+ /**
+ * When 'duration' is over 'default trace segment's slow threshold' that
should be sampled. Or when 'sample' is with
+ * in [0,defaultSamplingRate) that also should be sampled.
+ *
+ * @param sample sample rate of trace segment
+ * @param duration duration of trace segment
+ * @return
+ */
+ private boolean shouldSampleByDefault(int sample, int duration) {
+ return isOverDefaultSlowThreshold(duration) ||
withinDefaultRateRange(sample);
+ }
+
+ /**
+ * On the basis of service's If the specific service's 'trace segment's
slow threshold' is not null. The same as
+ * 'samplingRate', if the specific service's 'samplingRate' is not null.
Otherwise,Using the default sampling
+ * policy.
+ * <p>
+ * The priority of sampling policy: 'trace segment's slow threshold' >
'samplingRate',no matter the service's or
+ * global. When 'duration' is over 'default trace segment's slow
threshold' that should be sampled. Or when 'sample'
+ * is with in [0,defaultSamplingRate) that also should be sampled.
+ *
+ * @param samplingPolicy the sampling policy of the specific service
+ * @param sample sample rate of trace segment
+ * @param duration duration of trace segment
+ * @return
+ */
+ private boolean shouldSampleService(SamplingPolicy samplingPolicy, int
sample, int duration) {
+ return (samplingPolicy.getDuration() != null &&
isOverSlowThreshold(duration, samplingPolicy.getDuration()))
+ || (samplingPolicy.getRate() != null && withinRateRange(sample,
samplingPolicy.getRate()))
+ // global policy
+ || (samplingPolicy.getDuration() == null &&
isOverDefaultSlowThreshold(duration))
+ || (samplingPolicy.getRate() == null &&
withinDefaultRateRange(sample));
+ }
+
+ private boolean withinDefaultRateRange(int sample) {
+ return withinRateRange(sample,
this.samplingPolicySettings.get().getDefaultPolicy().getRate());
+ }
+
+ private boolean isOverDefaultSlowThreshold(int duration) {
+ return isOverSlowThreshold(duration,
this.samplingPolicySettings.get().getDefaultPolicy().getDuration());
+ }
+
+ private boolean isOverSlowThreshold(int currentDuration, int
policyDuration) {
+ return (policyDuration > -1) && (currentDuration >= policyDuration);
+ }
+
+ private boolean withinRateRange(int currentSample, int policySample) {
+ return currentSample < policySample;
+ }
+
+ private void loadDefaultPolicySettings() {
+ this.samplingPolicySettings.set(defaultSamplingPolicySettings);
+ log.info("[trace-sampling-policy] use trace-sample-policy in static
file : {}", this.samplingPolicySettings);
+ }
+
+ private void activeSetting(String config) {
+ if (log.isDebugEnabled()) {
+ log.debug("[trace-sampling-policy] Updating using new config: {}",
config);
+ }
+ onUpdated(parseFromYml(config));
+ }
+
+ private void onUpdated(final SamplingPolicySettings
samplingPolicySettings) {
+ if (!isNull(samplingPolicySettings)) {
+ this.samplingPolicySettings.set(samplingPolicySettings);
+ log.info("[trace-sampling-policy] Updating trace-sample-policy
with: {}", samplingPolicySettings);
+ } else {
+ log.info(
+ "[trace-sampling-policy] Parse yaml fail, retain last
configuration: {}", this.samplingPolicySettings);
+ }
+ }
+
+ private SamplingPolicySettings parseFromFile(final String file) {
+ try {
+ SamplingPolicySettingsReader reader = new
SamplingPolicySettingsReader(ResourceUtils.read(file));
+ return reader.readSettings();
+ } catch (Exception e) {
+ log.error("[trace-sampling-policy] Cannot load configs from: {}",
file, e);
+ }
+ // It must have a default config on init
+ return new SamplingPolicySettings();
+ }
+
+ private SamplingPolicySettings parseFromYml(final String ymlContent) {
+ try {
+ SamplingPolicySettingsReader reader = new
SamplingPolicySettingsReader(new StringReader(ymlContent));
+ SamplingPolicySettings settings = reader.readSettings();
+ this.settingsString.set(ymlContent);
+ return settings;
+ } catch (Exception e) {
+ log.error("[trace-sampling-policy] Failed to parse yml content:
\n{}", ymlContent, e);
+ }
+ // Config update maybe parse fail
+ return null;
+ }
+
+}
diff --git
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java
index f3597bb..c2b87aa 100644
---
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java
+++
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java
@@ -26,7 +26,6 @@ import
org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
-import
org.apache.skywalking.oap.server.analyzer.provider.trace.TraceLatencyThresholdsAndWatcher;
import
org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.strategy.SegmentStatusAnalyzer;
import
org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.strategy.SegmentStatusStrategy;
import org.apache.skywalking.oap.server.core.Const;
@@ -54,7 +53,6 @@ public class SegmentAnalysisListener implements
FirstAnalysisListener, EntryAnal
private final NamingControl namingControl;
private final List<String> searchableTagKeys;
private final SegmentStatusAnalyzer segmentStatusAnalyzer;
- private final TraceLatencyThresholdsAndWatcher
traceLatencyThresholdsAndWatcher;
private final Segment segment = new Segment();
private SAMPLE_STATUS sampleStatus = SAMPLE_STATUS.UNKNOWN;
@@ -139,12 +137,10 @@ public class SegmentAnalysisListener implements
FirstAnalysisListener, EntryAnal
duration = accurateDuration > Integer.MAX_VALUE ? Integer.MAX_VALUE :
(int) accurateDuration;
if (sampleStatus.equals(SAMPLE_STATUS.UNKNOWN) ||
sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
- if (sampler.shouldSample(segmentObject.getTraceId())) {
+ if (sampler.shouldSample(segmentObject, duration)) {
sampleStatus = SAMPLE_STATUS.SAMPLED;
} else if (isError && forceSampleErrorSegment) {
sampleStatus = SAMPLE_STATUS.SAMPLED;
- } else if
(traceLatencyThresholdsAndWatcher.shouldSample(duration)) {
- sampleStatus = SAMPLE_STATUS.SAMPLED;
} else {
sampleStatus = SAMPLE_STATUS.IGNORE;
}
@@ -191,7 +187,6 @@ public class SegmentAnalysisListener implements
FirstAnalysisListener, EntryAnal
private final NamingControl namingControl;
private final List<String> searchTagKeys;
private final SegmentStatusAnalyzer segmentStatusAnalyzer;
- private final TraceLatencyThresholdsAndWatcher
traceLatencyThresholdsAndWatcher;
public Factory(ModuleManager moduleManager, AnalyzerModuleConfig
config) {
this.sourceReceiver =
moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
@@ -199,14 +194,13 @@ public class SegmentAnalysisListener implements
FirstAnalysisListener, EntryAnal
.provider()
.getService(ConfigService.class);
this.searchTagKeys =
Arrays.asList(configService.getSearchableTracesTags().split(Const.COMMA));
- this.sampler = new
TraceSegmentSampler(config.getTraceSampleRateWatcher());
+ this.sampler = new
TraceSegmentSampler(config.getTraceSamplingPolicyWatcher());
this.forceSampleErrorSegment = config.isForceSampleErrorSegment();
this.namingControl = moduleManager.find(CoreModule.NAME)
.provider()
.getService(NamingControl.class);
this.segmentStatusAnalyzer =
SegmentStatusStrategy.findByName(config.getSegmentStatusAnalysisStrategy())
.getExceptionAnalyzer();
- this.traceLatencyThresholdsAndWatcher =
config.getTraceLatencyThresholdsAndWatcher();
}
@Override
@@ -217,8 +211,7 @@ public class SegmentAnalysisListener implements
FirstAnalysisListener, EntryAnal
forceSampleErrorSegment,
namingControl,
searchTagKeys,
- segmentStatusAnalyzer,
- traceLatencyThresholdsAndWatcher
+ segmentStatusAnalyzer
);
}
}
diff --git
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/TraceSegmentSampler.java
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/TraceSegmentSampler.java
index 4623a2c..d3b8d25 100644
---
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/TraceSegmentSampler.java
+++
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/TraceSegmentSampler.java
@@ -18,20 +18,22 @@
package
org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
-import
org.apache.skywalking.oap.server.analyzer.provider.trace.TraceSampleRateWatcher;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
+import
org.apache.skywalking.oap.server.analyzer.provider.trace.TraceSamplingPolicyWatcher;
/**
* The sampler makes the sampling mechanism works at backend side. Sample
result: [0,sampleRate) sampled, (sampleRate,~)
* ignored
*/
+@RequiredArgsConstructor
public class TraceSegmentSampler {
- private TraceSampleRateWatcher traceSampleRateWatcher;
+ private final TraceSamplingPolicyWatcher traceSamplingPolicyWatcher;
- public TraceSegmentSampler(TraceSampleRateWatcher traceSampleRateWatcher) {
- this.traceSampleRateWatcher = traceSampleRateWatcher;
+ public boolean shouldSample(SegmentObject segmentObject, int duration) {
+ int sample = Math.abs(segmentObject.getTraceId().hashCode()) % 10000;
+ String serviceName = segmentObject.getService();
+ return traceSamplingPolicyWatcher.shouldSample(serviceName, sample,
duration);
}
- public boolean shouldSample(String traceId) {
- return Math.abs(traceId.hashCode()) % 10000 <
traceSampleRateWatcher.getSampleRate();
- }
}
diff --git
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/TraceSegmentSampler.java
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/sampling/SamplingPolicy.java
similarity index 57%
copy from
oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/TraceSegmentSampler.java
copy to
oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/sampling/SamplingPolicy.java
index 4623a2c..fa7675f 100644
---
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/TraceSegmentSampler.java
+++
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/sampling/SamplingPolicy.java
@@ -13,25 +13,22 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-package
org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
-
-import
org.apache.skywalking.oap.server.analyzer.provider.trace.TraceSampleRateWatcher;
-
-/**
- * The sampler makes the sampling mechanism works at backend side. Sample
result: [0,sampleRate) sampled, (sampleRate,~)
- * ignored
- */
-public class TraceSegmentSampler {
- private TraceSampleRateWatcher traceSampleRateWatcher;
+package org.apache.skywalking.oap.server.analyzer.provider.trace.sampling;
- public TraceSegmentSampler(TraceSampleRateWatcher traceSampleRateWatcher) {
- this.traceSampleRateWatcher = traceSampleRateWatcher;
- }
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
- public boolean shouldSample(String traceId) {
- return Math.abs(traceId.hashCode()) % 10000 <
traceSampleRateWatcher.getSampleRate();
- }
-}
+@Getter
+@Setter
+@ToString
+@AllArgsConstructor
+@NoArgsConstructor
+public class SamplingPolicy {
+ private Integer rate;
+ private Integer duration;
+}
\ No newline at end of file
diff --git
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/sampling/SamplingPolicySettings.java
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/sampling/SamplingPolicySettings.java
new file mode 100644
index 0000000..5009ed5
--- /dev/null
+++
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/sampling/SamplingPolicySettings.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.analyzer.provider.trace.sampling;
+
+import lombok.Getter;
+import lombok.ToString;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@ToString
+public class SamplingPolicySettings {
+
+ @Getter
+ private SamplingPolicy defaultPolicy;
+ private Map<String, SamplingPolicy> services;
+
+ /**
+ * The sample rate precision is 1/10000. 10000 means 100% sample in
default. Setting this threshold about the
+ * latency would make the slow trace segments sampled if they cost more
time, even the sampling mechanism activated.
+ * The default value is `-1`, which means would not sample slow traces.
Unit, millisecond.
+ */
+ public SamplingPolicySettings() {
+ this.defaultPolicy = new SamplingPolicy(10000, -1);
+ this.services = new ConcurrentHashMap<>();
+ }
+
+ public void add(String service, SamplingPolicy samplingPolicy) {
+ this.services.put(service, samplingPolicy);
+ }
+
+ public SamplingPolicy get(String service) {
+ return this.services.get(service);
+ }
+}
diff --git
a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/sampling/SamplingPolicySettingsReader.java
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/sampling/SamplingPolicySettingsReader.java
new file mode 100644
index 0000000..18aecdc
--- /dev/null
+++
b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/sampling/SamplingPolicySettingsReader.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.analyzer.provider.trace.sampling;
+
+import org.apache.skywalking.apm.util.StringUtil;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * SamplePolicySettingsReader parses the given
`trace-sampling-policy-settings.yml` config file, to the target {@link
+ * SamplingPolicySettings}.
+ */
+public class SamplingPolicySettingsReader {
+ private Map yamlData;
+
+ public SamplingPolicySettingsReader(InputStream inputStream) {
+ Yaml yaml = new Yaml(new SafeConstructor());
+ yamlData = yaml.load(inputStream);
+ }
+
+ public SamplingPolicySettingsReader(Reader io) {
+ Yaml yaml = new Yaml(new SafeConstructor());
+ yamlData = yaml.load(io);
+ }
+
+ /**
+ * Read policy config file to {@link SamplingPolicySettings}
+ */
+ public SamplingPolicySettings readSettings() {
+ SamplingPolicySettings samplingPolicySettings = new
SamplingPolicySettings();
+ if (Objects.nonNull(yamlData)) {
+ readDefaultSamplingPolicy(samplingPolicySettings);
+ readServicesSamplingPolicy(samplingPolicySettings);
+ }
+ return samplingPolicySettings;
+ }
+
+ private void readDefaultSamplingPolicy(SamplingPolicySettings
samplingPolicySettings) {
+ Map<String, Object> objectMap = (Map<String, Object>)
yamlData.get("default");
+ if (objectMap == null) {
+ return;
+ }
+ if (objectMap.get("rate") != null) {
+ samplingPolicySettings.getDefaultPolicy().setRate((Integer)
objectMap.get("rate"));
+ }
+ if (objectMap.get("duration") != null) {
+ samplingPolicySettings.getDefaultPolicy().setDuration((Integer)
objectMap.get("duration"));
+ }
+ }
+
+ private void readServicesSamplingPolicy(SamplingPolicySettings
samplingPolicySettings) {
+ Map<String, Object> objectMap = (Map<String, Object>) yamlData;
+ Object servicesObject = objectMap.get("services");
+ if (servicesObject != null) {
+ List<Map<String, Object>> serviceList = (List<Map<String,
Object>>) servicesObject;
+ serviceList.forEach(service -> {
+ String name = (String) service.get("name");
+ if (StringUtil.isBlank(name)) {
+ return;
+ }
+ SamplingPolicy samplingPolicy = new SamplingPolicy();
+ samplingPolicy.setRate(service.get("rate") == null ? null :
(Integer) service.get("rate"));
+ samplingPolicy.setDuration(service.get("duration") == null ?
null : (Integer) service.get("duration"));
+ samplingPolicySettings.add(name, samplingPolicy);
+ });
+ }
+ }
+}
diff --git
a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceLatencyThresholdsAndWatcherTest.java
b/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceLatencyThresholdsAndWatcherTest.java
deleted file mode 100644
index bb1e03c..0000000
---
a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceLatencyThresholdsAndWatcherTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.analyzer.provider.trace;
-
-import java.util.Optional;
-import java.util.Set;
-import
org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleProvider;
-import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
-import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
-import
org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
-import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-@RunWith(MockitoJUnitRunner.class)
-public class TraceLatencyThresholdsAndWatcherTest {
- private AnalyzerModuleProvider provider;
-
- @Before
- public void init() {
- provider = new AnalyzerModuleProvider();
- }
-
- @Test
- public void testInit() {
- TraceLatencyThresholdsAndWatcher traceLatencyThresholdsAndWatcher =
new TraceLatencyThresholdsAndWatcher(provider);
-
Assert.assertEquals(traceLatencyThresholdsAndWatcher.getSlowTraceSegmentThreshold(),
-1);
- Assert.assertEquals(traceLatencyThresholdsAndWatcher.value(), "-1");
- }
-
- @Test(timeout = 20000)
- public void testDynamicUpdate() throws InterruptedException {
- ConfigWatcherRegister register = new MockConfigWatcherRegister(3);
-
- TraceLatencyThresholdsAndWatcher watcher = new
TraceLatencyThresholdsAndWatcher(provider);
- register.registerConfigChangeWatcher(watcher);
- register.start();
-
- while (watcher.getSlowTraceSegmentThreshold() < 0) {
- Thread.sleep(2000);
- }
- assertThat(watcher.getSlowTraceSegmentThreshold(), is(3000));
- assertThat(provider.getModuleConfig().getSlowTraceSegmentThreshold(),
is(-1));
- }
-
- @Test
- public void testNotify() {
- TraceLatencyThresholdsAndWatcher traceLatencyThresholdsAndWatcher =
new TraceLatencyThresholdsAndWatcher(provider);
- ConfigChangeWatcher.ConfigChangeEvent value1 = new
ConfigChangeWatcher.ConfigChangeEvent(
- "8000", ConfigChangeWatcher.EventType.MODIFY);
-
- traceLatencyThresholdsAndWatcher.notify(value1);
-
Assert.assertEquals(traceLatencyThresholdsAndWatcher.getSlowTraceSegmentThreshold(),
8000);
- Assert.assertEquals(traceLatencyThresholdsAndWatcher.value(), "8000");
-
- ConfigChangeWatcher.ConfigChangeEvent value2 = new
ConfigChangeWatcher.ConfigChangeEvent(
- "8000", ConfigChangeWatcher.EventType.DELETE);
-
- traceLatencyThresholdsAndWatcher.notify(value2);
-
Assert.assertEquals(traceLatencyThresholdsAndWatcher.getSlowTraceSegmentThreshold(),
-1);
- Assert.assertEquals(traceLatencyThresholdsAndWatcher.value(), "-1");
-
- ConfigChangeWatcher.ConfigChangeEvent value3 = new
ConfigChangeWatcher.ConfigChangeEvent(
- "800", ConfigChangeWatcher.EventType.ADD);
-
- traceLatencyThresholdsAndWatcher.notify(value3);
-
Assert.assertEquals(traceLatencyThresholdsAndWatcher.getSlowTraceSegmentThreshold(),
800);
- Assert.assertEquals(traceLatencyThresholdsAndWatcher.value(), "800");
-
- ConfigChangeWatcher.ConfigChangeEvent value4 = new
ConfigChangeWatcher.ConfigChangeEvent(
- "abc", ConfigChangeWatcher.EventType.MODIFY);
-
- traceLatencyThresholdsAndWatcher.notify(value4);
-
Assert.assertEquals(traceLatencyThresholdsAndWatcher.getSlowTraceSegmentThreshold(),
800);
- Assert.assertEquals(traceLatencyThresholdsAndWatcher.value(), "800");
- }
-
- public static class MockConfigWatcherRegister extends
ConfigWatcherRegister {
-
- public MockConfigWatcherRegister(long syncPeriod) {
- super(syncPeriod);
- }
-
- @Override
- public Optional<ConfigTable> readConfig(Set<String> keys) {
- ConfigTable table = new ConfigTable();
- table.add(new
ConfigTable.ConfigItem("agent-analyzer.default.slowTraceSegmentThreshold",
"3000"));
- return Optional.of(table);
- }
-
- @Override
- public Optional<GroupConfigTable> readGroupConfig(final Set<String>
keys) {
- return Optional.empty();
- }
- }
-
-}
diff --git
a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSampleRateWatcherTest.java
b/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSampleRateWatcherTest.java
deleted file mode 100644
index 5e325b1..0000000
---
a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSampleRateWatcherTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.analyzer.provider.trace;
-
-import
org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleProvider;
-import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
-import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
-import
org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
-import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import java.util.Optional;
-import java.util.Set;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-@RunWith(MockitoJUnitRunner.class)
-public class TraceSampleRateWatcherTest {
- private AnalyzerModuleProvider provider;
-
- @Before
- public void init() {
- provider = new AnalyzerModuleProvider();
- }
-
- @Test
- public void testInit() {
- TraceSampleRateWatcher traceSampleRateWatcher = new
TraceSampleRateWatcher(provider);
- Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 10000);
- Assert.assertEquals(traceSampleRateWatcher.value(), "10000");
- }
-
- @Test(timeout = 20000)
- public void testDynamicUpdate() throws InterruptedException {
- ConfigWatcherRegister register = new MockConfigWatcherRegister(3);
-
- TraceSampleRateWatcher watcher = new TraceSampleRateWatcher(provider);
- register.registerConfigChangeWatcher(watcher);
- register.start();
-
- while (watcher.getSampleRate() == 10000) {
- Thread.sleep(2000);
- }
- assertThat(watcher.getSampleRate(), is(9000));
- assertThat(provider.getModuleConfig().getSampleRate(), is(10000));
- }
-
- @Test
- public void testNotify() {
- TraceSampleRateWatcher traceSampleRateWatcher = new
TraceSampleRateWatcher(provider);
- ConfigChangeWatcher.ConfigChangeEvent value1 = new
ConfigChangeWatcher.ConfigChangeEvent(
- "8000", ConfigChangeWatcher.EventType.MODIFY);
-
- traceSampleRateWatcher.notify(value1);
- Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 8000);
- Assert.assertEquals(traceSampleRateWatcher.value(), "8000");
-
- ConfigChangeWatcher.ConfigChangeEvent value2 = new
ConfigChangeWatcher.ConfigChangeEvent(
- "8000", ConfigChangeWatcher.EventType.DELETE);
-
- traceSampleRateWatcher.notify(value2);
- Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 10000);
- Assert.assertEquals(traceSampleRateWatcher.value(), "10000");
-
- ConfigChangeWatcher.ConfigChangeEvent value3 = new
ConfigChangeWatcher.ConfigChangeEvent(
- "500", ConfigChangeWatcher.EventType.ADD);
-
- traceSampleRateWatcher.notify(value3);
- Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 500);
- Assert.assertEquals(traceSampleRateWatcher.value(), "500");
-
- ConfigChangeWatcher.ConfigChangeEvent value4 = new
ConfigChangeWatcher.ConfigChangeEvent(
- "abc", ConfigChangeWatcher.EventType.MODIFY);
-
- traceSampleRateWatcher.notify(value4);
- Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 500);
- Assert.assertEquals(traceSampleRateWatcher.value(), "500");
- }
-
- public static class MockConfigWatcherRegister extends
ConfigWatcherRegister {
-
- public MockConfigWatcherRegister(long syncPeriod) {
- super(syncPeriod);
- }
-
- @Override
- public Optional<ConfigTable> readConfig(Set<String> keys) {
- ConfigTable table = new ConfigTable();
- table.add(new
ConfigTable.ConfigItem("agent-analyzer.default.sampleRate", "9000"));
- return Optional.of(table);
- }
-
- @Override
- public Optional<GroupConfigTable> readGroupConfig(final Set<String>
keys) {
- return Optional.empty();
- }
- }
-
-}
diff --git
a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSamplingPolicyWatcherTest.java
b/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSamplingPolicyWatcherTest.java
new file mode 100644
index 0000000..43e036b
--- /dev/null
+++
b/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSamplingPolicyWatcherTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.analyzer.provider.trace;
+
+import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
+import
org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleProvider;
+import
org.apache.skywalking.oap.server.analyzer.provider.trace.sampling.SamplingPolicy;
+import
org.apache.skywalking.oap.server.analyzer.provider.trace.sampling.SamplingPolicySettings;
+import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
+import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
+import
org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
+import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TraceSamplingPolicyWatcherTest {
+
+ private AnalyzerModuleProvider provider;
+ private AnalyzerModuleConfig moduleConfig;
+
+ @Before
+ public void init() {
+ provider = new AnalyzerModuleProvider();
+ moduleConfig = new AnalyzerModuleConfig();
+
moduleConfig.setTraceSamplingPolicySettingsFile("trace-sampling-policy-settings.yml");
+ }
+
+ @Test
+ public void testStaticConfigInit() {
+ TraceSamplingPolicyWatcher watcher = new
TraceSamplingPolicyWatcher(moduleConfig, provider);
+ // default sample = 10000
+ globalDefaultSamplingRateEquals(watcher, 9999);
+ }
+
+ @Test(timeout = 20000)
+ public void testTraceLatencyThresholdDynamicUpdate() throws
InterruptedException {
+ ConfigWatcherRegister register = new
TraceLatencyThresholdMockConfigWatcherRegister(3);
+
+ TraceSamplingPolicyWatcher watcher = new
TraceSamplingPolicyWatcher(moduleConfig, provider);
+ register.registerConfigChangeWatcher(watcher);
+ register.start();
+ // Default duration is -1, so 3000 must not be sampled,until updating
to 3000
+ while (!watcher.shouldSample("", 10000, 3000)) {
+ Thread.sleep(2000);
+ }
+ Assert.assertTrue(watcher.shouldSample("", 10000, 3001));
+ }
+
+ @Test
+ public void testTraceLatencyThresholdNotify() {
+ TraceSamplingPolicyWatcher watcher = new
TraceSamplingPolicyWatcher(moduleConfig, provider);
+ ConfigChangeWatcher.ConfigChangeEvent value1 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "default:\n" +
+ " duration: 8000", ConfigChangeWatcher.EventType.MODIFY);
+
+ watcher.notify(value1);
+ globalDefaultDurationEquals(watcher, 8000);
+ Assert.assertEquals(watcher.value(), "default:\n" +
+ " duration: 8000");
+
+ ConfigChangeWatcher.ConfigChangeEvent value2 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "default:\n" +
+ " duration: 8000", ConfigChangeWatcher.EventType.DELETE);
+
+ watcher.notify(value2);
+ Assert.assertEquals(watcher.value(), null);
+
+ ConfigChangeWatcher.ConfigChangeEvent value3 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "default:\n" +
+ " duration: 800", ConfigChangeWatcher.EventType.ADD);
+
+ watcher.notify(value3);
+ globalDefaultDurationEquals(watcher, 800);
+ Assert.assertEquals(watcher.value(), "default:\n" +
+ " duration: 800");
+
+ ConfigChangeWatcher.ConfigChangeEvent value4 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "default:\n" +
+ " duration: abc", ConfigChangeWatcher.EventType.MODIFY);
+
+ watcher.notify(value4);
+ globalDefaultDurationEquals(watcher, 800);
+ Assert.assertEquals(watcher.value(), "default:\n" +
+ " duration: 800");
+
+ ConfigChangeWatcher.ConfigChangeEvent value5 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "default:\n" +
+ " rate: abc\n" +
+ " duration: 900", ConfigChangeWatcher.EventType.MODIFY);
+
+ watcher.notify(value5);
+ globalDefaultDurationEquals(watcher, 800);
+ Assert.assertEquals(watcher.value(), "default:\n" +
+ " duration: 800");
+ }
+
+ public static class TraceLatencyThresholdMockConfigWatcherRegister extends
ConfigWatcherRegister {
+
+ public TraceLatencyThresholdMockConfigWatcherRegister(long syncPeriod)
{
+ super(syncPeriod);
+ }
+
+ @Override
+ public Optional<ConfigTable> readConfig(Set<String> keys) {
+ ConfigTable table = new ConfigTable();
+ table.add(new
ConfigTable.ConfigItem("agent-analyzer.default.traceSamplingPolicy",
"default:\n" +
+ " duration: 3000"));
+ return Optional.of(table);
+ }
+
+ @Override
+ public Optional<GroupConfigTable> readGroupConfig(final Set<String>
keys) {
+ return Optional.empty();
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testDefaultSampleRateDynamicUpdate() throws
InterruptedException {
+ ConfigWatcherRegister register = new
DefaultSampleRateMockConfigWatcherRegister(3);
+
+ TraceSamplingPolicyWatcher watcher = new
TraceSamplingPolicyWatcher(moduleConfig, provider);
+ register.registerConfigChangeWatcher(watcher);
+ register.start();
+ // Default is 10000, so 9000 must be sampled,until updating to 9000
+ while (watcher.shouldSample("", 9000, -1)) {
+ Thread.sleep(2000);
+ }
+ globalDefaultSamplingRateEquals(watcher, 8999);
+ }
+
+ @Test
+ public void testDefaultSampleRateNotify() {
+ TraceSamplingPolicyWatcher watcher = new
TraceSamplingPolicyWatcher(moduleConfig, provider);
+ ConfigChangeWatcher.ConfigChangeEvent value1 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "default:\n" +
+ " rate: 8000", ConfigChangeWatcher.EventType.MODIFY);
+
+ watcher.notify(value1);
+ globalDefaultSamplingRateEquals(watcher, 7999);
+ Assert.assertEquals(watcher.value(), "default:\n" +
+ " rate: 8000");
+
+ ConfigChangeWatcher.ConfigChangeEvent value2 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "default:\n" +
+ " rate: 1000", ConfigChangeWatcher.EventType.DELETE);
+
+ watcher.notify(value2);
+ globalDefaultSamplingRateEquals(watcher, 9999);
+ Assert.assertEquals(watcher.value(), null);
+
+ ConfigChangeWatcher.ConfigChangeEvent value3 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "default:\n" +
+ " rate: 500", ConfigChangeWatcher.EventType.ADD);
+
+ watcher.notify(value3);
+ globalDefaultSamplingRateEquals(watcher, 499);
+ Assert.assertEquals(watcher.value(), "default:\n" +
+ " rate: 500");
+
+ ConfigChangeWatcher.ConfigChangeEvent value4 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "default:\n" +
+ " rate: abc", ConfigChangeWatcher.EventType.MODIFY);
+
+ watcher.notify(value4);
+ globalDefaultSamplingRateEquals(watcher, 499);
+ Assert.assertEquals(watcher.value(), "default:\n" +
+ " rate: 500");
+
+ ConfigChangeWatcher.ConfigChangeEvent value5 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "default:\n" +
+ " rate: 400" +
+ " duration: abc", ConfigChangeWatcher.EventType.MODIFY);
+
+ watcher.notify(value5);
+ globalDefaultSamplingRateEquals(watcher, 499);
+ Assert.assertEquals(watcher.value(), "default:\n" +
+ " rate: 500");
+ }
+
+ public static class DefaultSampleRateMockConfigWatcherRegister extends
ConfigWatcherRegister {
+
+ public DefaultSampleRateMockConfigWatcherRegister(long syncPeriod) {
+ super(syncPeriod);
+ }
+
+ @Override
+ public Optional<ConfigTable> readConfig(Set<String> keys) {
+ ConfigTable table = new ConfigTable();
+ table.add(new
ConfigTable.ConfigItem("agent-analyzer.default.traceSamplingPolicy",
"default:\n" +
+ " rate: 9000"));
+ return Optional.of(table);
+ }
+
+ @Override
+ public Optional<GroupConfigTable> readGroupConfig(final Set<String>
keys) {
+ return Optional.empty();
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testServiceSampleRateDynamicUpdate() throws
InterruptedException {
+ ConfigWatcherRegister register = new
ServiceMockConfigWatcherRegister(3);
+
+ TraceSamplingPolicyWatcher watcher = new
TraceSamplingPolicyWatcher(moduleConfig, provider);
+ provider.getModuleConfig().setTraceSamplingPolicyWatcher(watcher);
+ register.registerConfigChangeWatcher(watcher);
+ register.start();
+
+ while (getSamplingPolicy("serverName1", watcher) == null) {
+ Thread.sleep(1000);
+ }
+
+ SamplingPolicy samplingPolicy = getSamplingPolicy("serverName1",
watcher);
+ Assert.assertEquals(samplingPolicy.getRate().intValue(), 2000);
+ Assert.assertEquals(samplingPolicy.getDuration().intValue(), 30000);
+ Assert.assertEquals(getSamplingPolicy("serverName1",
provider.getModuleConfig().getTraceSamplingPolicyWatcher())
+ .getRate()
+ .intValue(), 2000);
+ }
+
+ @Test
+ public void testServiceSampleRateNotify() {
+ TraceSamplingPolicyWatcher watcher = new
TraceSamplingPolicyWatcher(moduleConfig, provider);
+ ConfigChangeWatcher.ConfigChangeEvent value1 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "services:\n" +
+ " - name: serverName1\n" +
+ " rate: 8000\n" +
+ " duration: 20000", ConfigChangeWatcher.EventType.MODIFY);
+
+ watcher.notify(value1);
+
+ Assert.assertEquals(getSamplingPolicy("serverName1",
watcher).getRate().intValue(), 8000);
+ Assert.assertEquals(getSamplingPolicy("serverName1",
watcher).getDuration().intValue(), 20000);
+ Assert.assertEquals(watcher.value(), "services:\n" +
+ " - name: serverName1\n" +
+ " rate: 8000\n" +
+ " duration: 20000");
+
+ // use serverName1's sampling rate
+ Assert.assertTrue(watcher.shouldSample("serverName1", 7999, -1));
+ Assert.assertTrue(watcher.shouldSample("serverName1", 10000, 20000));
+
+ ConfigChangeWatcher.ConfigChangeEvent value2 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "", ConfigChangeWatcher.EventType.DELETE);
+
+ watcher.notify(value2);
+
+ Assert.assertNull(getSamplingPolicy("serverName1", watcher));
+ // use global sampling rate
+ Assert.assertTrue(watcher.shouldSample("serverName1", 9999, -1));
+ Assert.assertFalse(watcher.shouldSample("serverName1", 10000, 1));
+
+ Assert.assertEquals(watcher.value(), null);
+
+ ConfigChangeWatcher.ConfigChangeEvent value3 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "services:\n" +
+ " - name: serverName1\n" +
+ " rate: 8000\n" +
+ " duration: 20000", ConfigChangeWatcher.EventType.ADD);
+
+ watcher.notify(value3);
+ Assert.assertEquals(getSamplingPolicy("serverName1",
watcher).getRate().intValue(), 8000);
+ Assert.assertEquals(getSamplingPolicy("serverName1",
watcher).getDuration().intValue(), 20000);
+ Assert.assertTrue(watcher.shouldSample("serverName1", 7999, -1));
+ Assert.assertTrue(watcher.shouldSample("serverName1", 10000, 20000));
+
+ Assert.assertEquals(watcher.value(), "services:\n" +
+ " - name: serverName1\n" +
+ " rate: 8000\n" +
+ " duration: 20000");
+
+ ConfigChangeWatcher.ConfigChangeEvent value4 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "services:\n" +
+ " - name: serverName1\n" +
+ " rate: 9000\n" +
+ " duration: 30000", ConfigChangeWatcher.EventType.MODIFY);
+
+ watcher.notify(value4);
+ Assert.assertEquals(getSamplingPolicy("serverName1",
watcher).getRate().intValue(), 9000);
+ Assert.assertEquals(getSamplingPolicy("serverName1",
watcher).getDuration().intValue(), 30000);
+ Assert.assertTrue(watcher.shouldSample("serverName1", 8999, -1));
+ Assert.assertTrue(watcher.shouldSample("serverName1", 10000, 30000));
+
+ Assert.assertEquals(watcher.value(), "services:\n" +
+ " - name: serverName1\n" +
+ " rate: 9000\n" +
+ " duration: 30000");
+
+ ConfigChangeWatcher.ConfigChangeEvent value5 = new
ConfigChangeWatcher.ConfigChangeEvent(
+ "services:\n" +
+ " - name: serverName1\n" +
+ " rate: 8000\n" +
+ " duration: abc", ConfigChangeWatcher.EventType.MODIFY);
+
+ watcher.notify(value5);
+ Assert.assertEquals(getSamplingPolicy("serverName1",
watcher).getRate().intValue(), 9000);
+ Assert.assertEquals(getSamplingPolicy("serverName1",
watcher).getDuration().intValue(), 30000);
+ Assert.assertTrue(watcher.shouldSample("serverName1", 8999, -1));
+ Assert.assertTrue(watcher.shouldSample("serverName1", 10000, 30000));
+
+ Assert.assertEquals(watcher.value(), "services:\n" +
+ " - name: serverName1\n" +
+ " rate: 9000\n" +
+ " duration: 30000");
+
+ }
+
+ public static class ServiceMockConfigWatcherRegister extends
ConfigWatcherRegister {
+
+ public ServiceMockConfigWatcherRegister(long syncPeriod) {
+ super(syncPeriod);
+ }
+
+ @Override
+ public Optional<ConfigTable> readConfig(Set<String> keys) {
+ ConfigTable table = new ConfigTable();
+ table.add(new
ConfigTable.ConfigItem("agent-analyzer.default.traceSamplingPolicy",
"services:\n" +
+ " - name: serverName1\n" +
+ " rate: 2000\n" +
+ " duration: 30000"));
+ return Optional.of(table);
+ }
+
+ @Override
+ public Optional<GroupConfigTable> readGroupConfig(final Set<String>
keys) {
+ return Optional.empty();
+ }
+ }
+
+ private void globalDefaultSamplingRateEquals(TraceSamplingPolicyWatcher
watcher, int sample) {
+ Assert.assertTrue(watcher.shouldSample("", sample, -1));
+ Assert.assertFalse(watcher.shouldSample("", sample + 1, -1));
+ }
+
+ private void globalDefaultDurationEquals(TraceSamplingPolicyWatcher
watcher, int duration) {
+ Assert.assertTrue(watcher.shouldSample("", 10000, duration));
+ Assert.assertFalse(watcher.shouldSample("", 10000, duration - 1));
+ }
+
+ private SamplingPolicy getSamplingPolicy(String service,
TraceSamplingPolicyWatcher watcher) {
+ AtomicReference<SamplingPolicySettings> samplingPolicySettings =
Whitebox.getInternalState(
+ watcher, "samplingPolicySettings");
+ return samplingPolicySettings.get().get(service);
+ }
+}
diff --git
a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/sampling/SamplingPolicySettingsReaderTest.java
b/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/sampling/SamplingPolicySettingsReaderTest.java
new file mode 100644
index 0000000..ffec602
--- /dev/null
+++
b/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/sampling/SamplingPolicySettingsReaderTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.analyzer.provider.trace.sampling;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SamplingPolicySettingsReaderTest {
+
+ @Test
+ public void testReadPolicySettings() {
+ SamplingPolicySettingsReader reader = new
SamplingPolicySettingsReader(this.getClass()
+
.getClassLoader()
+
.getResourceAsStream(
+
"trace-sampling-policy-settings.yml"));
+ SamplingPolicySettings settings = reader.readSettings();
+ Assert.assertEquals(settings.getDefaultPolicy().getRate().intValue(),
10000);
+
Assert.assertEquals(settings.getDefaultPolicy().getDuration().intValue(), -1);
+
+ Assert.assertEquals(settings.get("name1").getRate().intValue(), 1000);
+ Assert.assertEquals(settings.get("name1").getDuration().intValue(),
20000);
+
+ Assert.assertEquals(settings.get("name2").getRate().intValue(), 2000);
+ Assert.assertEquals(settings.get("name2").getDuration().intValue(),
30000);
+ }
+}
\ No newline at end of file
diff --git
a/oap-server/analyzer/agent-analyzer/src/test/resources/trace-sampling-policy-settings.yml
b/oap-server/analyzer/agent-analyzer/src/test/resources/trace-sampling-policy-settings.yml
new file mode 100755
index 0000000..e1f681e
--- /dev/null
+++
b/oap-server/analyzer/agent-analyzer/src/test/resources/trace-sampling-policy-settings.yml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+default:
+ rate: 10000
+ duration: -1
+services:
+ - name: name1
+ rate: 1000
+ duration: 20000
+ - name: name2
+ rate: 2000
+ duration: 30000
\ No newline at end of file
diff --git a/oap-server/server-bootstrap/pom.xml
b/oap-server/server-bootstrap/pom.xml
index a9e8964..9a8db46 100644
--- a/oap-server/server-bootstrap/pom.xml
+++ b/oap-server/server-bootstrap/pom.xml
@@ -276,6 +276,7 @@
<exclude>service-apdex-threshold.yml</exclude>
<exclude>endpoint-name-grouping.yml</exclude>
<exclude>metadata-service-mapping.yaml</exclude>
+ <exclude>trace-sampling-policy-settings.yml</exclude>
<exclude>oal/</exclude>
<exclude>fetcher-prom-rules/</exclude>
<exclude>envoy-metrics-rules/</exclude>
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml
b/oap-server/server-bootstrap/src/main/resources/application.yml
index 28af6ef..f06d551 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -269,14 +269,14 @@ storage:
agent-analyzer:
selector: ${SW_AGENT_ANALYZER:default}
default:
- sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is
1/10000. 10000 means 100% sample in default.
+ # The default sampling rate and the default trace latency time configured
by the 'traceSamplingPolicySettingsFile' file.
+ traceSamplingPolicySettingsFile:
${SW_TRACE_SAMPLING_POLICY_SETTINGS_FILE:trace-sampling-policy-settings.yml}
slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} #
The slow database access thresholds. Unit ms.
forceSampleErrorSegment: ${SW_FORCE_SAMPLE_ERROR_SEGMENT:true} # When
sampling mechanism active, this config can open(true) force save some error
segment. true is default.
segmentStatusAnalysisStrategy:
${SW_SEGMENT_STATUS_ANALYSIS_STRATEGY:FROM_SPAN_STATUS} # Determine the final
segment status from the status of spans. Available values are
`FROM_SPAN_STATUS` , `FROM_ENTRY_SPAN` and `FROM_FIRST_SPAN`.
`FROM_SPAN_STATUS` represents the segment status would be error if any span is
in error status. `FROM_ENTRY_SPAN` means the segment status would be determined
by the status of entry spans only. `FROM_FIRST_SPAN` means the segment status
would be determine [...]
# Nginx and Envoy agents can't get the real remote address.
# Exit spans with the component in the list would not generate the
client-side instance relation metrics.
noUpstreamRealAddressAgents: ${SW_NO_UPSTREAM_REAL_ADDRESS:6000,9000}
- slowTraceSegmentThreshold: ${SW_SLOW_TRACE_SEGMENT_THRESHOLD:-1} # Setting
this threshold about the latency would make the slow trace segments sampled if
they cost more time, even the sampling mechanism activated. The default value
is `-1`, which means would not sample slow traces. Unit, millisecond.
meterAnalyzerActiveFiles: ${SW_METER_ANALYZER_ACTIVE_FILES:} # Which files
could be meter analyzed, files split by ","
log-analyzer:
diff --git
a/oap-server/server-bootstrap/src/main/resources/trace-sampling-policy-settings.yml
b/oap-server/server-bootstrap/src/main/resources/trace-sampling-policy-settings.yml
new file mode 100755
index 0000000..7cce88a
--- /dev/null
+++
b/oap-server/server-bootstrap/src/main/resources/trace-sampling-policy-settings.yml
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+default:
+ # Default sampling rate that replaces the 'agent-analyzer.default.sampleRate'
+ # The sample rate precision is 1/10000. 10000 means 100% sample in default.
+ rate: 10000
+ # Default trace latency time that replaces the
'agent-analyzer.default.slowTraceSegmentThreshold'
+ # Setting this threshold about the latency would make the slow trace
segments sampled if they cost more time, even the sampling mechanism activated.
The default value is `-1`, which means would not sample slow traces. Unit,
millisecond.
+ duration: -1
+#services:
+# - name: serverName
+# rate: 1000 # Sampling rate of this specific service
+# duration: 10000 # Trace latency threshold for trace sampling for this
specific service