This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 9810c5e Improve buffer reader speed. (#2188)
9810c5e is described below
commit 9810c5e308e059a7f6fd913a74f80a4c261a28b0
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Mon Jan 21 04:21:45 2019 +0800
Improve buffer reader speed. (#2188)
* 1. Sleep 500 milliseconds after a batch re-call finish.
2. Re-create register lock index when the system property named debug is
setting.
3. Get the register inventory before lock to avoid increment the sequence
but not use.
4. Return the exchange flag after all the references and spans in one
segment parsed to reduce the number of segment parse.
5. Put the not exchanged segment into a collection then try to exchange no
more than 10 times because of the exchange is asynchronous.
6. Cache the segment object to avoid repeated deserialization.
#2185
* #2185
1. Sleep 500 milliseconds after a batch re-call finish.
2. Re-create register lock index when the system property named debug is
setting.
3. Get the register inventory before lock to avoid increment the sequence
but not use.
---
.../register/worker/RegisterPersistentWorker.java | 41 ++++++++-----
.../oap/server/library/buffer/BufferData.java | 38 ++++++++++++
.../library/buffer/BufferDataCollection.java | 57 ++++++++++++++++++
.../oap/server/library/buffer/BufferStream.java | 3 +-
.../server/library/buffer/DataStreamReader.java | 68 ++++++++++++++++------
.../library/buffer/BufferStreamTestCase.java | 14 ++---
.../receiver/mesh/MeshDataBufferFileCache.java | 9 ++-
.../trace/provider/parser/SegmentParse.java | 61 ++++++++++---------
.../trace/provider/parser/SegmentParseV2.java | 62 +++++++++++---------
.../standardization/ReferenceIdExchanger.java | 13 +++--
.../parser/standardization/SpanIdExchanger.java | 13 +++--
.../elasticsearch/lock/RegisterLockInstaller.java | 12 ++++
12 files changed, 283 insertions(+), 108 deletions(-)
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 613619b..2ad37d2 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -70,25 +70,36 @@ public class RegisterPersistentWorker extends
AbstractWorker<RegisterSource> {
if (sources.size() > 1000 ||
registerSource.getEndOfBatchContext().isEndOfBatch()) {
sources.values().forEach(source -> {
- int sequence;
- if ((sequence = registerLockDAO.tryLockAndIncrement(scope)) !=
Const.NONE) {
- try {
- RegisterSource dbSource = registerDAO.get(modelName,
source.id());
- if (Objects.nonNull(dbSource)) {
- if (dbSource.combine(source)) {
- registerDAO.forceUpdate(modelName, dbSource);
+ try {
+ RegisterSource dbSource = registerDAO.get(modelName,
source.id());
+ if (Objects.nonNull(dbSource)) {
+ if (dbSource.combine(source)) {
+ registerDAO.forceUpdate(modelName, dbSource);
+ }
+ } else {
+ int sequence;
+ if ((sequence =
registerLockDAO.tryLockAndIncrement(scope)) != Const.NONE) {
+ try {
+ dbSource = registerDAO.get(modelName,
source.id());
+ if (Objects.nonNull(dbSource)) {
+ if (dbSource.combine(source)) {
+ registerDAO.forceUpdate(modelName,
dbSource);
+ }
+ } else {
+ source.setSequence(sequence);
+ registerDAO.forceInsert(modelName, source);
+ }
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ } finally {
+ registerLockDAO.releaseLock(scope);
}
} else {
- source.setSequence(sequence);
- registerDAO.forceInsert(modelName, source);
+ logger.info("{} inventory register try lock and
increment sequence failure.", scope.name());
}
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
- } finally {
- registerLockDAO.releaseLock(scope);
}
- } else {
- logger.info("{} inventory register try lock and increment
sequence failure.", scope.name());
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
}
});
sources.clear();
diff --git
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferData.java
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferData.java
new file mode 100644
index 0000000..4ef90c5
--- /dev/null
+++
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferData.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import com.google.protobuf.GeneratedMessageV3;
+import lombok.*;
+import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
+
+/**
+ * @author peng-yongsheng
+ */
+@Getter
+public class BufferData<MESSAGE_TYPE extends GeneratedMessageV3> {
+ private MESSAGE_TYPE messageType;
+ @Setter private TraceSegmentObject v1Segment;
+ @Setter private SegmentObject v2Segment;
+
+ public BufferData(MESSAGE_TYPE messageType) {
+ this.messageType = messageType;
+ }
+}
diff --git
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferDataCollection.java
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferDataCollection.java
new file mode 100644
index 0000000..89e4846
--- /dev/null
+++
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferDataCollection.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import com.google.protobuf.GeneratedMessageV3;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author peng-yongsheng
+ */
+public class BufferDataCollection<MESSAGE_TYPE extends GeneratedMessageV3> {
+
+ private AtomicInteger index = new AtomicInteger(0);
+ private final List<BufferData<MESSAGE_TYPE>> bufferDataList;
+
+ public BufferDataCollection(int size) {
+ this.bufferDataList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ bufferDataList.add(null);
+ }
+ }
+
+ public void add(BufferData<MESSAGE_TYPE> bufferData) {
+ bufferDataList.set(index.getAndIncrement(), bufferData);
+
+ }
+
+ public int size() {
+ return index.get();
+ }
+
+ public synchronized List<BufferData<MESSAGE_TYPE>> export() {
+ List<BufferData<MESSAGE_TYPE>> exportData = new
ArrayList<>(index.get());
+ for (int i = 0; i < index.get(); i++) {
+ exportData.add(bufferDataList.get(i));
+ }
+ index.set(0);
+ return exportData;
+ }
+}
diff --git
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
index f286a83..eeae8cc 100644
---
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
+++
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
@@ -121,7 +121,8 @@ public class BufferStream<MESSAGE_TYPE extends
GeneratedMessageV3> {
return this;
}
- public Builder<MESSAGE_TYPE>
callBack(DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
+ public Builder<MESSAGE_TYPE> callBack(
+ DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
this.callBack = callBack;
return this;
}
diff --git
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
index b8cf17b..12849bf 100644
---
a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
+++
b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.library.buffer;
import com.google.protobuf.*;
import java.io.*;
-import java.util.Objects;
+import java.util.*;
import java.util.concurrent.*;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.PrefixFileFilter;
@@ -38,6 +38,8 @@ public class DataStreamReader<MESSAGE_TYPE extends
GeneratedMessageV3> {
private final Offset.ReadOffset readOffset;
private final Parser<MESSAGE_TYPE> parser;
private final CallBack<MESSAGE_TYPE> callBack;
+ private final int collectionSize = 100;
+ private final BufferDataCollection<MESSAGE_TYPE> bufferDataCollection;
private File readingFile;
private InputStream inputStream;
@@ -47,6 +49,7 @@ public class DataStreamReader<MESSAGE_TYPE extends
GeneratedMessageV3> {
this.readOffset = readOffset;
this.parser = parser;
this.callBack = callBack;
+ this.bufferDataCollection = new BufferDataCollection<>(collectionSize);
}
void initialize() {
@@ -114,25 +117,32 @@ public class DataStreamReader<MESSAGE_TYPE extends
GeneratedMessageV3> {
}
while (readOffset.getOffset() < readingFile.length()) {
+ BufferData<MESSAGE_TYPE> bufferData = new
BufferData<>(parser.parseDelimitedFrom(inputStream));
- MESSAGE_TYPE messageType =
parser.parseDelimitedFrom(inputStream);
- if (messageType != null) {
- int i = 0;
- while (!callBack.call(messageType)) {
- try {
- TimeUnit.MILLISECONDS.sleep(500);
- } catch (InterruptedException e) {
- logger.error(e.getMessage());
- }
+ if (bufferData.getMessageType() != null) {
+ boolean isComplete = callBack.call(bufferData);
+ final int serialized =
bufferData.getMessageType().getSerializedSize();
+ final int offset =
CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
+ readOffset.setOffset(readOffset.getOffset() + offset);
- i++;
- if (i == 10) {
- break;
+ if (!isComplete) {
+ if (bufferDataCollection.size() == collectionSize) {
+ reCall();
}
+ bufferDataCollection.add(bufferData);
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("collection size: {}, max size: {}",
bufferDataCollection.size(), collectionSize);
+ }
+ } else if (bufferDataCollection.size() > 0) {
+ reCall();
+ } else {
+ try {
+ TimeUnit.SECONDS.sleep(5);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
}
- final int serialized = messageType.getSerializedSize();
- final int offset =
CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
- readOffset.setOffset(readOffset.getOffset() + offset);
}
}
} catch (IOException e) {
@@ -140,7 +150,31 @@ public class DataStreamReader<MESSAGE_TYPE extends
GeneratedMessageV3> {
}
}
+ private void reCall() {
+ int maxCycle = 10;
+ for (int i = 1; i <= maxCycle; i++) {
+ if (bufferDataCollection.size() > 0) {
+ List<BufferData<MESSAGE_TYPE>> bufferDataList =
bufferDataCollection.export();
+ for (BufferData<MESSAGE_TYPE> data : bufferDataList) {
+ if (!callBack.call(data)) {
+ if (i != maxCycle) {
+ bufferDataCollection.add(data);
+ }
+ }
+ }
+
+ try {
+ TimeUnit.MILLISECONDS.sleep(500);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
public interface CallBack<MESSAGE_TYPE extends GeneratedMessageV3> {
- boolean call(MESSAGE_TYPE message);
+ boolean call(BufferData<MESSAGE_TYPE> bufferData);
}
}
diff --git
a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
index 9752de7..4b7e7d6 100644
---
a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
+++
b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
@@ -37,7 +37,10 @@ public class BufferStreamTestCase {
builder.dataFileMaxSize(50);
builder.offsetFileMaxSize(10);
builder.parser(TraceSegmentObject.parser());
- builder.callBack(new SegmentParse());
+ builder.callBack(bufferData -> {
+ logger.info("segment parse: {}",
bufferData.getMessageType().getSpans(0).getSpanId());
+ return false;
+ });
BufferStream<TraceSegmentObject> stream = builder.build();
stream.initialize();
@@ -62,14 +65,5 @@ public class BufferStreamTestCase {
TimeUnit.MILLISECONDS.sleep(50);
}
}
-
- }
-
- private static class SegmentParse implements
DataStreamReader.CallBack<TraceSegmentObject> {
-
- @Override public boolean call(TraceSegmentObject message) {
- logger.info("segment parse: {}", message.getSpans(0).getSpanId());
- return true;
- }
}
}
diff --git
a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java
b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java
index 92e97b4..ba6da62 100644
---
a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java
+++
b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/MeshDataBufferFileCache.java
@@ -23,8 +23,7 @@ import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
-import org.apache.skywalking.oap.server.library.buffer.BufferStream;
-import org.apache.skywalking.oap.server.library.buffer.DataStreamReader;
+import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
@@ -97,11 +96,11 @@ public class MeshDataBufferFileCache implements
IConsumer<ServiceMeshMetricDataD
/**
* File buffer callback. Block reading from buffer file, until metadata
register done.
*
- * @param message
+ * @param bufferData
* @return
*/
- @Override public boolean call(ServiceMeshMetric message) {
- ServiceMeshMetricDataDecorator decorator = new
ServiceMeshMetricDataDecorator(message);
+ @Override public boolean call(BufferData<ServiceMeshMetric> bufferData) {
+ ServiceMeshMetricDataDecorator decorator = new
ServiceMeshMetricDataDecorator(bufferData.getMessageType());
if (decorator.tryMetaDataRegister()) {
meshBufferFileOut.inc();
TelemetryDataDispatcher.doDispatch(decorator);
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
index eb8c692..0a4bd69 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
@@ -22,7 +22,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
import java.util.*;
import lombok.Setter;
import org.apache.skywalking.apm.network.language.agent.*;
-import org.apache.skywalking.oap.server.library.buffer.DataStreamReader;
+import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
import
org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
@@ -66,12 +66,17 @@ public class SegmentParse {
MetricTag.EMPTY_KEY, MetricTag.EMPTY_VALUE);
}
- public boolean parse(UpstreamSegment segment, Source source) {
+ public boolean parse(BufferData<UpstreamSegment> bufferData, Source
source) {
createSpanListeners();
try {
- List<UniqueId> traceIds = segment.getGlobalTraceIdsList();
- TraceSegmentObject segmentObject = parseBinarySegment(segment);
+ UpstreamSegment upstreamSegment = bufferData.getMessageType();
+ List<UniqueId> traceIds = upstreamSegment.getGlobalTraceIdsList();
+
+ if (bufferData.getV1Segment() == null) {
+ bufferData.setV1Segment(parseBinarySegment(upstreamSegment));
+ }
+ TraceSegmentObject segmentObject = bufferData.getV1Segment();
SegmentDecorator segmentDecorator = new
SegmentDecorator(segmentObject);
@@ -81,7 +86,7 @@ public class SegmentParse {
}
if (source.equals(Source.Agent)) {
- writeToBufferFile(segmentCoreInfo.getSegmentId(), segment);
+ writeToBufferFile(segmentCoreInfo.getSegmentId(),
upstreamSegment);
} else {
// from SegmentSource.Buffer
TRACE_BUFFER_FILE_RETRY.inc();
@@ -127,16 +132,18 @@ public class SegmentParse {
segmentCoreInfo.setDataBinary(segmentDecorator.toByteArray());
segmentCoreInfo.setV2(false);
+ boolean exchanged = true;
+
for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
if
(!SpanIdExchanger.getInstance(moduleManager).exchange(spanDecorator,
segmentCoreInfo.getServiceId())) {
- return false;
+ exchanged = false;
} else {
for (int j = 0; j < spanDecorator.getRefsCount(); j++) {
ReferenceDecorator referenceDecorator =
spanDecorator.getRefs(j);
if
(!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator,
segmentCoreInfo.getServiceId())) {
- return false;
+ exchanged = false;
}
}
}
@@ -150,28 +157,30 @@ public class SegmentParse {
segmentCoreInfo.setError(spanDecorator.getIsError() ||
segmentCoreInfo.isError());
}
- long minuteTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime());
- segmentCoreInfo.setMinuteTimeBucket(minuteTimeBucket);
+ if (exchanged) {
+ long minuteTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime());
+ segmentCoreInfo.setMinuteTimeBucket(minuteTimeBucket);
- for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
- SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
+ for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
+ SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
- if (spanDecorator.getSpanId() == 0) {
- notifyFirstListener(spanDecorator);
- }
+ if (spanDecorator.getSpanId() == 0) {
+ notifyFirstListener(spanDecorator);
+ }
- if (SpanType.Exit.equals(spanDecorator.getSpanType())) {
- notifyExitListener(spanDecorator);
- } else if (SpanType.Entry.equals(spanDecorator.getSpanType())) {
- notifyEntryListener(spanDecorator);
- } else if (SpanType.Local.equals(spanDecorator.getSpanType())) {
- notifyLocalListener(spanDecorator);
- } else {
- logger.error("span type value was unexpected, span type name:
{}", spanDecorator.getSpanType().name());
+ if (SpanType.Exit.equals(spanDecorator.getSpanType())) {
+ notifyExitListener(spanDecorator);
+ } else if (SpanType.Entry.equals(spanDecorator.getSpanType()))
{
+ notifyEntryListener(spanDecorator);
+ } else if (SpanType.Local.equals(spanDecorator.getSpanType()))
{
+ notifyLocalListener(spanDecorator);
+ } else {
+ logger.error("span type value was unexpected, span type
name: {}", spanDecorator.getSpanType().name());
+ }
}
}
- return true;
+ return exchanged;
}
private void writeToBufferFile(String id, UpstreamSegment upstreamSegment)
{
@@ -251,13 +260,13 @@ public class SegmentParse {
public void send(UpstreamSegment segment, Source source) {
SegmentParse segmentParse = new SegmentParse(moduleManager,
listenerManager);
segmentParse.setStandardizationWorker(standardizationWorker);
- segmentParse.parse(segment, source);
+ segmentParse.parse(new BufferData<>(segment), source);
}
- @Override public boolean call(UpstreamSegment segment) {
+ @Override public boolean call(BufferData<UpstreamSegment> bufferData) {
SegmentParse segmentParse = new SegmentParse(moduleManager,
listenerManager);
segmentParse.setStandardizationWorker(standardizationWorker);
- boolean parseResult = segmentParse.parse(segment, Source.Buffer);
+ boolean parseResult = segmentParse.parse(bufferData,
Source.Buffer);
if (parseResult) {
segmentParse.TRACE_BUFFER_FILE_OUT.inc();
}
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
index be074ed..dd28ba4 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
@@ -23,7 +23,7 @@ import java.util.*;
import lombok.Setter;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
-import org.apache.skywalking.oap.server.library.buffer.DataStreamReader;
+import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
import
org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
@@ -71,12 +71,18 @@ public class SegmentParseV2 {
}
}
- public boolean parse(UpstreamSegment segment, SegmentSource source) {
+ public boolean parse(BufferData<UpstreamSegment> bufferData, SegmentSource
source) {
createSpanListeners();
try {
- List<UniqueId> traceIds = segment.getGlobalTraceIdsList();
- SegmentObject segmentObject = parseBinarySegment(segment);
+ UpstreamSegment upstreamSegment = bufferData.getMessageType();
+
+ List<UniqueId> traceIds = upstreamSegment.getGlobalTraceIdsList();
+
+ if (bufferData.getV2Segment() == null) {
+ bufferData.setV2Segment(parseBinarySegment(upstreamSegment));
+ }
+ SegmentObject segmentObject = parseBinarySegment(upstreamSegment);
SegmentDecorator segmentDecorator = new
SegmentDecorator(segmentObject);
@@ -86,7 +92,7 @@ public class SegmentParseV2 {
}
if (source.equals(SegmentSource.Agent)) {
- writeToBufferFile(segmentCoreInfo.getSegmentId(), segment);
+ writeToBufferFile(segmentCoreInfo.getSegmentId(),
upstreamSegment);
} else {
// from SegmentSource.Buffer
TRACE_BUFFER_FILE_RETRY.inc();
@@ -132,16 +138,18 @@ public class SegmentParseV2 {
segmentCoreInfo.setDataBinary(segmentDecorator.toByteArray());
segmentCoreInfo.setV2(true);
+ boolean exchanged = true;
+
for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
if
(!SpanIdExchanger.getInstance(moduleManager).exchange(spanDecorator,
segmentCoreInfo.getServiceId())) {
- return false;
+ exchanged = false;
} else {
for (int j = 0; j < spanDecorator.getRefsCount(); j++) {
ReferenceDecorator referenceDecorator =
spanDecorator.getRefs(j);
if
(!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator,
segmentCoreInfo.getServiceId())) {
- return false;
+ exchanged = false;
}
}
}
@@ -155,28 +163,30 @@ public class SegmentParseV2 {
segmentCoreInfo.setError(spanDecorator.getIsError() ||
segmentCoreInfo.isError());
}
- long minuteTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime());
- segmentCoreInfo.setMinuteTimeBucket(minuteTimeBucket);
+ if (exchanged) {
+ long minuteTimeBucket =
TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime());
+ segmentCoreInfo.setMinuteTimeBucket(minuteTimeBucket);
- for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
- SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
+ for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
+ SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
- if (spanDecorator.getSpanId() == 0) {
- notifyFirstListener(spanDecorator);
- }
+ if (spanDecorator.getSpanId() == 0) {
+ notifyFirstListener(spanDecorator);
+ }
- if (SpanType.Exit.equals(spanDecorator.getSpanType())) {
- notifyExitListener(spanDecorator);
- } else if (SpanType.Entry.equals(spanDecorator.getSpanType())) {
- notifyEntryListener(spanDecorator);
- } else if (SpanType.Local.equals(spanDecorator.getSpanType())) {
- notifyLocalListener(spanDecorator);
- } else {
- logger.error("span type value was unexpected, span type name:
{}", spanDecorator.getSpanType().name());
+ if (SpanType.Exit.equals(spanDecorator.getSpanType())) {
+ notifyExitListener(spanDecorator);
+ } else if (SpanType.Entry.equals(spanDecorator.getSpanType()))
{
+ notifyEntryListener(spanDecorator);
+ } else if (SpanType.Local.equals(spanDecorator.getSpanType()))
{
+ notifyLocalListener(spanDecorator);
+ } else {
+ logger.error("span type value was unexpected, span type
name: {}", spanDecorator.getSpanType().name());
+ }
}
}
- return true;
+ return exchanged;
}
private void writeToBufferFile(String id, UpstreamSegment upstreamSegment)
{
@@ -252,13 +262,13 @@ public class SegmentParseV2 {
public void send(UpstreamSegment segment, SegmentSource source) {
SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager,
listenerManager);
segmentParse.setStandardizationWorker(standardizationWorker);
- segmentParse.parse(segment, source);
+ segmentParse.parse(new BufferData<>(segment), source);
}
- @Override public boolean call(UpstreamSegment segment) {
+ @Override public boolean call(BufferData<UpstreamSegment> bufferData) {
SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager,
listenerManager);
segmentParse.setStandardizationWorker(standardizationWorker);
- boolean parseResult = segmentParse.parse(segment,
SegmentSource.Buffer);
+ boolean parseResult = segmentParse.parse(bufferData,
SegmentSource.Buffer);
if (parseResult) {
segmentParse.TRACE_BUFFER_FILE_OUT.inc();
}
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
index decbdde..2e8b98c 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
@@ -53,6 +53,8 @@ public class ReferenceIdExchanger implements
IdExchanger<ReferenceDecorator> {
}
@Override public boolean exchange(ReferenceDecorator standardBuilder, int
serviceId) {
+ boolean exchanged = true;
+
if (standardBuilder.getEntryEndpointId() == 0) {
String entryEndpointName =
Strings.isNullOrEmpty(standardBuilder.getEntryEndpointName()) ?
Const.DOMAIN_OPERATION_NAME : standardBuilder.getEntryEndpointName();
int entryServiceId =
serviceInstanceInventoryCache.get(standardBuilder.getEntryServiceInstanceId()).getServiceId();
@@ -61,7 +63,8 @@ public class ReferenceIdExchanger implements
IdExchanger<ReferenceDecorator> {
if (logger.isDebugEnabled()) {
logger.debug("entry endpoint name: {} from service id: {}
exchange failed", entryEndpointName, entryServiceId);
}
- return false;
+
+ exchanged = false;
} else {
standardBuilder.toBuilder();
standardBuilder.setEntryEndpointId(entryEndpointId);
@@ -78,7 +81,8 @@ public class ReferenceIdExchanger implements
IdExchanger<ReferenceDecorator> {
if (logger.isDebugEnabled()) {
logger.debug("parent endpoint name: {} from service id: {}
exchange failed", parentEndpointName, parentServiceId);
}
- return false;
+
+ exchanged = false;
} else {
standardBuilder.toBuilder();
standardBuilder.setParentEndpointId(parentEndpointId);
@@ -93,14 +97,15 @@ public class ReferenceIdExchanger implements
IdExchanger<ReferenceDecorator> {
if (logger.isDebugEnabled()) {
logger.debug("network getAddress: {} from service id: {}
exchange failed", standardBuilder.getNetworkAddress(), serviceId);
}
- return false;
+
+ exchanged = false;
} else {
standardBuilder.toBuilder();
standardBuilder.setNetworkAddressId(networkAddressId);
standardBuilder.setNetworkAddress(Const.EMPTY_STRING);
}
}
- return true;
+ return exchanged;
}
/**
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java
index dfeeb33..42bf56c 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java
@@ -63,6 +63,8 @@ public class SpanIdExchanger implements
IdExchanger<SpanDecorator> {
}
@Override public boolean exchange(SpanDecorator standardBuilder, int
serviceId) {
+ boolean exchanged = true;
+
if (standardBuilder.getComponentId() == 0 &&
!Strings.isNullOrEmpty(standardBuilder.getComponent())) {
int componentId =
componentLibraryCatalogService.getComponentId(standardBuilder.getComponent());
@@ -70,7 +72,8 @@ public class SpanIdExchanger implements
IdExchanger<SpanDecorator> {
if (logger.isDebugEnabled()) {
logger.debug("component: {} in service: {} exchange
failed", standardBuilder.getComponent(), serviceId);
}
- return false;
+
+ exchanged = false;
} else {
standardBuilder.toBuilder();
standardBuilder.setComponentId(componentId);
@@ -86,7 +89,8 @@ public class SpanIdExchanger implements
IdExchanger<SpanDecorator> {
if (logger.isDebugEnabled()) {
logger.debug("peer: {} in service: {} exchange failed",
standardBuilder.getPeer(), serviceId);
}
- return false;
+
+ exchanged = false;
} else {
standardBuilder.toBuilder();
standardBuilder.setPeerId(peerId);
@@ -123,14 +127,15 @@ public class SpanIdExchanger implements
IdExchanger<SpanDecorator> {
if (logger.isDebugEnabled()) {
logger.debug("endpoint name: {} from service id: {}
exchange failed", endpointName, serviceId);
}
- return false;
+
+ exchanged = false;
} else {
standardBuilder.toBuilder();
standardBuilder.setOperationNameId(endpointId);
standardBuilder.setOperationName(Const.EMPTY_STRING);
}
}
- return true;
+ return exchanged;
}
private JsonObject buildServiceProperties(SpanDecorator standardBuilder) {
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
index 53e6557..2dec7cc 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
@@ -43,10 +43,18 @@ public class RegisterLockInstaller {
}
public void install() throws StorageException {
+ boolean debug = System.getProperty("debug") != null;
+
try {
if (!client.isExistsIndex(RegisterLockIndex.NAME)) {
+ logger.info("table: {} does not exist",
RegisterLockIndex.NAME);
+ createIndex();
+ } else if (debug) {
+ logger.info("table: {} exists", RegisterLockIndex.NAME);
+ deleteIndex();
createIndex();
}
+
for (Class registerSource :
InventoryProcess.INSTANCE.getAllRegisterSources()) {
Scope sourceScope =
StorageEntityAnnotationUtils.getSourceScope(registerSource);
putIfAbsent(sourceScope.ordinal());
@@ -56,6 +64,10 @@ public class RegisterLockInstaller {
}
}
+ private void deleteIndex() throws IOException {
+ client.deleteIndex(RegisterLockIndex.NAME);
+ }
+
private void createIndex() throws IOException {
Settings settings = Settings.builder()
.put("index.number_of_shards", 1)