This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 065a384 Trace buffer test success. (#1733)
065a384 is described below
commit 065a384f1034795a58ad39c5a9fd2e416f89adb7
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Mon Oct 8 23:17:15 2018 +0800
Trace buffer test success. (#1733)
---
.../oap/server/library/buffer/BufferStream.java | 6 ++---
.../SegmentStandardizationWorker.java | 29 ++++++++--------------
.../server/receiver/trace/mock/AgentDataMock.java | 8 ++++++
3 files changed, 22 insertions(+), 21 deletions(-)
diff --git
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
index 7929436..f286a83 100644
---
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
+++
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
@@ -68,7 +68,7 @@ public class BufferStream<MESSAGE_TYPE extends
GeneratedMessageV3> {
}
private void tryLock(File directory) {
- logger.info("Try to lock buffer directory, directory is: " +
absolutePath);
+ logger.info("Try to lock buffer directory, directory is: " +
directory.getAbsolutePath());
FileLock lock = null;
try {
@@ -78,10 +78,10 @@ public class BufferStream<MESSAGE_TYPE extends
GeneratedMessageV3> {
}
if (lock == null) {
- throw new RuntimeException("The buffer directory is reading or
writing by another thread, directory is: " + absolutePath);
+ throw new RuntimeException("The buffer directory is reading or
writing by another thread, directory is: " + directory.getAbsolutePath());
}
- logger.info("Lock buffer directory successfully, directory is: " +
absolutePath);
+ logger.info("Lock buffer directory successfully, directory is: " +
directory.getAbsolutePath());
}
public static class Builder<MESSAGE_TYPE extends GeneratedMessageV3> {
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
index d696a36..a30c7a4 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
@@ -35,13 +35,11 @@ public class SegmentStandardizationWorker extends
AbstractWorker<SegmentStandard
private static final Logger logger =
LoggerFactory.getLogger(SegmentStandardizationWorker.class);
- private final BufferStream<UpstreamSegment> stream;
+ private final DataCarrier<SegmentStandardization> dataCarrier;
public SegmentStandardizationWorker(SegmentParse segmentParse, String path,
int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart)
throws IOException {
super(Integer.MAX_VALUE);
- DataCarrier<SegmentStandardization> dataCarrier = new DataCarrier<>(1,
1024);
- dataCarrier.consume(new Consumer(this), 1);
BufferStream.Builder<UpstreamSegment> builder = new
BufferStream.Builder<>(path);
builder.cleanWhenRestart(cleanWhenRestart);
@@ -50,21 +48,24 @@ public class SegmentStandardizationWorker extends
AbstractWorker<SegmentStandard
builder.parser(UpstreamSegment.parser());
builder.callBack(segmentParse);
- stream = builder.build();
+ BufferStream<UpstreamSegment> stream = builder.build();
stream.initialize();
+
+ dataCarrier = new DataCarrier<>(1, 1024);
+ dataCarrier.consume(new Consumer(stream), 1);
}
@Override
public void in(SegmentStandardization standardization) {
- stream.write(standardization.getUpstreamSegment());
+ dataCarrier.produce(standardization);
}
private class Consumer implements IConsumer<SegmentStandardization> {
- private final SegmentStandardizationWorker aggregator;
+ private final BufferStream<UpstreamSegment> stream;
- private Consumer(SegmentStandardizationWorker aggregator) {
- this.aggregator = aggregator;
+ private Consumer(BufferStream<UpstreamSegment> stream) {
+ this.stream = stream;
}
@Override
@@ -73,16 +74,8 @@ public class SegmentStandardizationWorker extends
AbstractWorker<SegmentStandard
@Override
public void consume(List<SegmentStandardization> data) {
- Iterator<SegmentStandardization> inputIterator = data.iterator();
-
- int i = 0;
- while (inputIterator.hasNext()) {
- SegmentStandardization indicator = inputIterator.next();
- i++;
- if (i == data.size()) {
- indicator.getEndOfBatchContext().setEndOfBatch(true);
- }
- aggregator.in(indicator);
+ for (SegmentStandardization aData : data) {
+ stream.write(aData.getUpstreamSegment());
}
}
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
index d19be0e..a96e56b 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
@@ -52,6 +52,14 @@ public class AgentDataMock {
UniqueId.Builder providerSegmentId = UniqueIdBuilder.INSTANCE.create();
providerMock.mock(streamObserver, globalTraceId, providerSegmentId,
consumerSegmentId, startTimestamp, true);
+ TimeUnit.SECONDS.sleep(10);
+
+ globalTraceId = UniqueIdBuilder.INSTANCE.create();
+ consumerSegmentId = UniqueIdBuilder.INSTANCE.create();
+ providerSegmentId = UniqueIdBuilder.INSTANCE.create();
+ consumerMock.mock(streamObserver, globalTraceId, consumerSegmentId,
startTimestamp, false);
+ providerMock.mock(streamObserver, globalTraceId, providerSegmentId,
consumerSegmentId, startTimestamp, false);
+
streamObserver.onCompleted();
while (!IS_COMPLETED) {
TimeUnit.MILLISECONDS.sleep(500);