This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 9025359  Fixed a mistake that inject v1 segment producer into v2 
standardization worker. (#3090)
9025359 is described below

commit 902535916343fa35740509e1a2200c6f8ba07364
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Mon Jul 15 23:13:04 2019 +0800

    Fixed a mistake that inject v1 segment producer into v2 standardization 
worker. (#3090)
---
 .../provider/DBLatencyThresholdsAndWatcher.java    |  7 +++---
 .../trace/provider/TraceModuleProvider.java        | 28 ++++++++--------------
 .../SegmentStandardizationWorker.java              |  7 +++---
 3 files changed, 17 insertions(+), 25 deletions(-)

diff --git 
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholdsAndWatcher.java
 
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholdsAndWatcher.java
index a2b2dc3..458af2c 100644
--- 
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholdsAndWatcher.java
+++ 
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholdsAndWatcher.java
@@ -21,6 +21,7 @@ package 
org.apache.skywalking.oap.server.receiver.trace.provider;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
+import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
 
 /**
@@ -30,10 +31,10 @@ public class DBLatencyThresholdsAndWatcher extends 
ConfigChangeWatcher {
     private AtomicReference<Map<String, Integer>> thresholds;
     private AtomicReference<String> settingsString;
 
-    DBLatencyThresholdsAndWatcher(String config, TraceModuleProvider provider) 
{
+    public DBLatencyThresholdsAndWatcher(String config, TraceModuleProvider 
provider) {
         super(TraceModule.NAME, provider, "slowDBAccessThreshold");
-        thresholds = new AtomicReference(new HashMap<>());
-        settingsString = new AtomicReference<>("");
+        thresholds = new AtomicReference<>(new HashMap<>());
+        settingsString = new AtomicReference<>(Const.EMPTY_STRING);
 
         activeSetting(config);
     }
diff --git 
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
 
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index c548408..a1502e3 100644
--- 
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ 
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -66,25 +66,21 @@ public class TraceModuleProvider extends ModuleProvider {
 
         moduleConfig.setDbLatencyThresholdsAndWatcher(thresholds);
 
-        SegmentParserListenerManager listenerManager = new 
SegmentParserListenerManager();
-        if (moduleConfig.isTraceAnalysis()) {
-            listenerManager.add(new MultiScopesSpanListener.Factory());
-            listenerManager.add(new ServiceMappingSpanListener.Factory());
-        }
-        listenerManager.add(new 
SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
+        segmentProducer = new SegmentParse.Producer(getManager(), 
listenerManager(), moduleConfig);
+        segmentProducerV2 = new SegmentParseV2.Producer(getManager(), 
listenerManager(), moduleConfig);
 
-        segmentProducer = new SegmentParse.Producer(getManager(), 
listenerManager, moduleConfig);
+        this.registerServiceImplementation(ISegmentParserService.class, new 
SegmentParserServiceImpl(segmentProducerV2));
+    }
 
-        listenerManager = new SegmentParserListenerManager();
+    public SegmentParserListenerManager listenerManager() {
+        SegmentParserListenerManager listenerManager = new 
SegmentParserListenerManager();
         if (moduleConfig.isTraceAnalysis()) {
             listenerManager.add(new MultiScopesSpanListener.Factory());
             listenerManager.add(new ServiceMappingSpanListener.Factory());
         }
         listenerManager.add(new 
SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
 
-        segmentProducerV2 = new SegmentParseV2.Producer(getManager(), 
listenerManager, moduleConfig);
-
-        this.registerServiceImplementation(ISegmentParserService.class, new 
SegmentParserServiceImpl(segmentProducerV2));
+        return listenerManager;
     }
 
     @Override public void start() throws ModuleStartException {
@@ -98,15 +94,11 @@ public class TraceModuleProvider extends ModuleProvider {
             grpcHandlerRegister.addHandler(new 
TraceSegmentReportServiceHandler(segmentProducerV2, getManager()));
             jettyHandlerRegister.addHandler(new 
TraceSegmentServletHandler(segmentProducer));
 
-            SegmentStandardizationWorker standardizationWorker = new 
SegmentStandardizationWorker(getManager(), segmentProducer,
-                moduleConfig.getBufferPath() + "v5", 
moduleConfig.getBufferOffsetMaxFileSize(), 
moduleConfig.getBufferDataMaxFileSize(), 
moduleConfig.isBufferFileCleanWhenRestart(),
-                false);
+            SegmentStandardizationWorker standardizationWorker = new 
SegmentStandardizationWorker(getManager(), segmentProducer, 
moduleConfig.getBufferPath() + "v5", moduleConfig.getBufferOffsetMaxFileSize(), 
moduleConfig.getBufferDataMaxFileSize(), 
moduleConfig.isBufferFileCleanWhenRestart(), false);
             segmentProducer.setStandardizationWorker(standardizationWorker);
 
-            SegmentStandardizationWorker standardizationWorker2 = new 
SegmentStandardizationWorker(getManager(), segmentProducer,
-                moduleConfig.getBufferPath(), 
moduleConfig.getBufferOffsetMaxFileSize(), 
moduleConfig.getBufferDataMaxFileSize(), 
moduleConfig.isBufferFileCleanWhenRestart(),
-                true);
-            segmentProducerV2.setStandardizationWorker(standardizationWorker2);
+            SegmentStandardizationWorker standardizationWorkerV2 = new 
SegmentStandardizationWorker(getManager(), segmentProducerV2, 
moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), 
moduleConfig.getBufferDataMaxFileSize(), 
moduleConfig.isBufferFileCleanWhenRestart(), true);
+            
segmentProducerV2.setStandardizationWorker(standardizationWorkerV2);
         } catch (IOException e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
diff --git 
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
 
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
index 5e363b0..92fc4a0 100644
--- 
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
+++ 
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
@@ -24,9 +24,8 @@ import 
org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.library.buffer.BufferStream;
+import org.apache.skywalking.oap.server.library.buffer.*;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-import 
org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.*;
 import org.slf4j.*;
@@ -42,7 +41,7 @@ public class SegmentStandardizationWorker extends 
AbstractWorker<SegmentStandard
     private CounterMetrics traceBufferFileIn;
 
     public SegmentStandardizationWorker(ModuleDefineHolder moduleDefineHolder,
-        SegmentParse.Producer segmentParseCreator, String path, int 
offsetFileMaxSize,
+        DataStreamReader.CallBack<UpstreamSegment> segmentParse, String path, 
int offsetFileMaxSize,
         int dataFileMaxSize, boolean cleanWhenRestart, boolean isV6) throws 
IOException {
         super(moduleDefineHolder);
 
@@ -51,7 +50,7 @@ public class SegmentStandardizationWorker extends 
AbstractWorker<SegmentStandard
         builder.dataFileMaxSize(dataFileMaxSize);
         builder.offsetFileMaxSize(offsetFileMaxSize);
         builder.parser(UpstreamSegment.parser());
-        builder.callBack(segmentParseCreator);
+        builder.callBack(segmentParse);
 
         BufferStream<UpstreamSegment> stream = builder.build();
         stream.initialize();

Reply via email to