This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 9025359 Fixed a mistake that inject v1 segment producer into v2
standardization worker. (#3090)
9025359 is described below
commit 902535916343fa35740509e1a2200c6f8ba07364
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Mon Jul 15 23:13:04 2019 +0800
Fixed a mistake that inject v1 segment producer into v2 standardization
worker. (#3090)
---
.../provider/DBLatencyThresholdsAndWatcher.java | 7 +++---
.../trace/provider/TraceModuleProvider.java | 28 ++++++++--------------
.../SegmentStandardizationWorker.java | 7 +++---
3 files changed, 17 insertions(+), 25 deletions(-)
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholdsAndWatcher.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholdsAndWatcher.java
index a2b2dc3..458af2c 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholdsAndWatcher.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholdsAndWatcher.java
@@ -21,6 +21,7 @@ package
org.apache.skywalking.oap.server.receiver.trace.provider;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
/**
@@ -30,10 +31,10 @@ public class DBLatencyThresholdsAndWatcher extends
ConfigChangeWatcher {
private AtomicReference<Map<String, Integer>> thresholds;
private AtomicReference<String> settingsString;
- DBLatencyThresholdsAndWatcher(String config, TraceModuleProvider provider)
{
+ public DBLatencyThresholdsAndWatcher(String config, TraceModuleProvider
provider) {
super(TraceModule.NAME, provider, "slowDBAccessThreshold");
- thresholds = new AtomicReference(new HashMap<>());
- settingsString = new AtomicReference<>("");
+ thresholds = new AtomicReference<>(new HashMap<>());
+ settingsString = new AtomicReference<>(Const.EMPTY_STRING);
activeSetting(config);
}
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index c548408..a1502e3 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -66,25 +66,21 @@ public class TraceModuleProvider extends ModuleProvider {
moduleConfig.setDbLatencyThresholdsAndWatcher(thresholds);
- SegmentParserListenerManager listenerManager = new
SegmentParserListenerManager();
- if (moduleConfig.isTraceAnalysis()) {
- listenerManager.add(new MultiScopesSpanListener.Factory());
- listenerManager.add(new ServiceMappingSpanListener.Factory());
- }
- listenerManager.add(new
SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
+ segmentProducer = new SegmentParse.Producer(getManager(),
listenerManager(), moduleConfig);
+ segmentProducerV2 = new SegmentParseV2.Producer(getManager(),
listenerManager(), moduleConfig);
- segmentProducer = new SegmentParse.Producer(getManager(),
listenerManager, moduleConfig);
+ this.registerServiceImplementation(ISegmentParserService.class, new
SegmentParserServiceImpl(segmentProducerV2));
+ }
- listenerManager = new SegmentParserListenerManager();
+ public SegmentParserListenerManager listenerManager() {
+ SegmentParserListenerManager listenerManager = new
SegmentParserListenerManager();
if (moduleConfig.isTraceAnalysis()) {
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
}
listenerManager.add(new
SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
- segmentProducerV2 = new SegmentParseV2.Producer(getManager(),
listenerManager, moduleConfig);
-
- this.registerServiceImplementation(ISegmentParserService.class, new
SegmentParserServiceImpl(segmentProducerV2));
+ return listenerManager;
}
@Override public void start() throws ModuleStartException {
@@ -98,15 +94,11 @@ public class TraceModuleProvider extends ModuleProvider {
grpcHandlerRegister.addHandler(new
TraceSegmentReportServiceHandler(segmentProducerV2, getManager()));
jettyHandlerRegister.addHandler(new
TraceSegmentServletHandler(segmentProducer));
- SegmentStandardizationWorker standardizationWorker = new
SegmentStandardizationWorker(getManager(), segmentProducer,
- moduleConfig.getBufferPath() + "v5",
moduleConfig.getBufferOffsetMaxFileSize(),
moduleConfig.getBufferDataMaxFileSize(),
moduleConfig.isBufferFileCleanWhenRestart(),
- false);
+ SegmentStandardizationWorker standardizationWorker = new
SegmentStandardizationWorker(getManager(), segmentProducer,
moduleConfig.getBufferPath() + "v5", moduleConfig.getBufferOffsetMaxFileSize(),
moduleConfig.getBufferDataMaxFileSize(),
moduleConfig.isBufferFileCleanWhenRestart(), false);
segmentProducer.setStandardizationWorker(standardizationWorker);
- SegmentStandardizationWorker standardizationWorker2 = new
SegmentStandardizationWorker(getManager(), segmentProducer,
- moduleConfig.getBufferPath(),
moduleConfig.getBufferOffsetMaxFileSize(),
moduleConfig.getBufferDataMaxFileSize(),
moduleConfig.isBufferFileCleanWhenRestart(),
- true);
- segmentProducerV2.setStandardizationWorker(standardizationWorker2);
+ SegmentStandardizationWorker standardizationWorkerV2 = new
SegmentStandardizationWorker(getManager(), segmentProducerV2,
moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(),
moduleConfig.getBufferDataMaxFileSize(),
moduleConfig.isBufferFileCleanWhenRestart(), true);
+
segmentProducerV2.setStandardizationWorker(standardizationWorkerV2);
} catch (IOException e) {
throw new ModuleStartException(e.getMessage(), e);
}
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
index 5e363b0..92fc4a0 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
@@ -24,9 +24,8 @@ import
org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.buffer.BufferStream;
+import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-import
org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
@@ -42,7 +41,7 @@ public class SegmentStandardizationWorker extends
AbstractWorker<SegmentStandard
private CounterMetrics traceBufferFileIn;
public SegmentStandardizationWorker(ModuleDefineHolder moduleDefineHolder,
- SegmentParse.Producer segmentParseCreator, String path, int
offsetFileMaxSize,
+ DataStreamReader.CallBack<UpstreamSegment> segmentParse, String path,
int offsetFileMaxSize,
int dataFileMaxSize, boolean cleanWhenRestart, boolean isV6) throws
IOException {
super(moduleDefineHolder);
@@ -51,7 +50,7 @@ public class SegmentStandardizationWorker extends
AbstractWorker<SegmentStandard
builder.dataFileMaxSize(dataFileMaxSize);
builder.offsetFileMaxSize(offsetFileMaxSize);
builder.parser(UpstreamSegment.parser());
- builder.callBack(segmentParseCreator);
+ builder.callBack(segmentParse);
BufferStream<UpstreamSegment> stream = builder.build();
stream.initialize();