This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 065a384  Trace buffer test success. (#1733)
065a384 is described below

commit 065a384f1034795a58ad39c5a9fd2e416f89adb7
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Mon Oct 8 23:17:15 2018 +0800

    Trace buffer test success. (#1733)
---
 .../oap/server/library/buffer/BufferStream.java    |  6 ++---
 .../SegmentStandardizationWorker.java              | 29 ++++++++--------------
 .../server/receiver/trace/mock/AgentDataMock.java  |  8 ++++++
 3 files changed, 22 insertions(+), 21 deletions(-)

diff --git 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
index 7929436..f286a83 100644
--- 
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
+++ 
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
@@ -68,7 +68,7 @@ public class BufferStream<MESSAGE_TYPE extends 
GeneratedMessageV3> {
     }
 
     private void tryLock(File directory) {
-        logger.info("Try to lock buffer directory, directory is: " + 
absolutePath);
+        logger.info("Try to lock buffer directory, directory is: " + 
directory.getAbsolutePath());
         FileLock lock = null;
 
         try {
@@ -78,10 +78,10 @@ public class BufferStream<MESSAGE_TYPE extends 
GeneratedMessageV3> {
         }
 
         if (lock == null) {
-            throw new RuntimeException("The buffer directory is reading or 
writing by another thread, directory is: " + absolutePath);
+            throw new RuntimeException("The buffer directory is reading or 
writing by another thread, directory is: " + directory.getAbsolutePath());
         }
 
-        logger.info("Lock buffer directory successfully, directory is: " + 
absolutePath);
+        logger.info("Lock buffer directory successfully, directory is: " + 
directory.getAbsolutePath());
     }
 
     public static class Builder<MESSAGE_TYPE extends GeneratedMessageV3> {
diff --git 
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
 
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
index d696a36..a30c7a4 100644
--- 
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
+++ 
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
@@ -35,13 +35,11 @@ public class SegmentStandardizationWorker extends 
AbstractWorker<SegmentStandard
 
     private static final Logger logger = 
LoggerFactory.getLogger(SegmentStandardizationWorker.class);
 
-    private final BufferStream<UpstreamSegment> stream;
+    private final DataCarrier<SegmentStandardization> dataCarrier;
 
     public SegmentStandardizationWorker(SegmentParse segmentParse, String path,
         int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart) 
throws IOException {
         super(Integer.MAX_VALUE);
-        DataCarrier<SegmentStandardization> dataCarrier = new DataCarrier<>(1, 
1024);
-        dataCarrier.consume(new Consumer(this), 1);
 
         BufferStream.Builder<UpstreamSegment> builder = new 
BufferStream.Builder<>(path);
         builder.cleanWhenRestart(cleanWhenRestart);
@@ -50,21 +48,24 @@ public class SegmentStandardizationWorker extends 
AbstractWorker<SegmentStandard
         builder.parser(UpstreamSegment.parser());
         builder.callBack(segmentParse);
 
-        stream = builder.build();
+        BufferStream<UpstreamSegment> stream = builder.build();
         stream.initialize();
+
+        dataCarrier = new DataCarrier<>(1, 1024);
+        dataCarrier.consume(new Consumer(stream), 1);
     }
 
     @Override
     public void in(SegmentStandardization standardization) {
-        stream.write(standardization.getUpstreamSegment());
+        dataCarrier.produce(standardization);
     }
 
     private class Consumer implements IConsumer<SegmentStandardization> {
 
-        private final SegmentStandardizationWorker aggregator;
+        private final BufferStream<UpstreamSegment> stream;
 
-        private Consumer(SegmentStandardizationWorker aggregator) {
-            this.aggregator = aggregator;
+        private Consumer(BufferStream<UpstreamSegment> stream) {
+            this.stream = stream;
         }
 
         @Override
@@ -73,16 +74,8 @@ public class SegmentStandardizationWorker extends 
AbstractWorker<SegmentStandard
 
         @Override
         public void consume(List<SegmentStandardization> data) {
-            Iterator<SegmentStandardization> inputIterator = data.iterator();
-
-            int i = 0;
-            while (inputIterator.hasNext()) {
-                SegmentStandardization indicator = inputIterator.next();
-                i++;
-                if (i == data.size()) {
-                    indicator.getEndOfBatchContext().setEndOfBatch(true);
-                }
-                aggregator.in(indicator);
+            for (SegmentStandardization aData : data) {
+                stream.write(aData.getUpstreamSegment());
             }
         }
 
diff --git 
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
 
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
index d19be0e..a96e56b 100644
--- 
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
+++ 
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
@@ -52,6 +52,14 @@ public class AgentDataMock {
         UniqueId.Builder providerSegmentId = UniqueIdBuilder.INSTANCE.create();
         providerMock.mock(streamObserver, globalTraceId, providerSegmentId, 
consumerSegmentId, startTimestamp, true);
 
+        TimeUnit.SECONDS.sleep(10);
+
+        globalTraceId = UniqueIdBuilder.INSTANCE.create();
+        consumerSegmentId = UniqueIdBuilder.INSTANCE.create();
+        providerSegmentId = UniqueIdBuilder.INSTANCE.create();
+        consumerMock.mock(streamObserver, globalTraceId, consumerSegmentId, 
startTimestamp, false);
+        providerMock.mock(streamObserver, globalTraceId, providerSegmentId, 
consumerSegmentId, startTimestamp, false);
+
         streamObserver.onCompleted();
         while (!IS_COMPLETED) {
             TimeUnit.MILLISECONDS.sleep(500);

Reply via email to