This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 6de3e55 Too many EndOfBatchContext instance created. (#3129)
6de3e55 is described below
commit 6de3e55280507397fa695e1968a5286de18d8d2b
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Sun Jul 21 00:26:17 2019 +0800
Too many EndOfBatchContext instance created. (#3129)
---
.../core/analysis/data/EndOfBatchContext.java | 39 ----------------------
.../oap/server/core/analysis/data/QueueData.java | 6 ++--
.../analysis/worker/MetricsAggregateWorker.java | 8 ++---
.../analysis/worker/MetricsPersistentWorker.java | 6 ++--
.../register/worker/RegisterDistinctWorker.java | 7 ++--
.../register/worker/RegisterPersistentWorker.java | 27 +++++----------
.../oap/server/core/remote/data/StreamData.java | 16 +++++----
.../standardization/SegmentStandardization.java | 16 +++++----
8 files changed, 42 insertions(+), 83 deletions(-)
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/EndOfBatchContext.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/EndOfBatchContext.java
deleted file mode 100644
index 528ace9..0000000
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/EndOfBatchContext.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-/**
- * @author peng-yongsheng
- */
-public class EndOfBatchContext {
-
- private boolean isEndOfBatch;
-
- public EndOfBatchContext(boolean isEndOfBatch) {
- this.isEndOfBatch = isEndOfBatch;
- }
-
- public boolean isEndOfBatch() {
- return isEndOfBatch;
- }
-
- public void setEndOfBatch(boolean endOfBatch) {
- isEndOfBatch = endOfBatch;
- }
-}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java
index 858e62d..494d348 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java
@@ -23,7 +23,9 @@ package org.apache.skywalking.oap.server.core.analysis.data;
*/
public interface QueueData {
- EndOfBatchContext getEndOfBatchContext();
+ void resetEndOfBatch();
- void setEndOfBatchContext(EndOfBatchContext context);
+ void asEndOfBatch();
+
+ boolean isEndOfBatch();
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index 59c75bb..ff3d0d0 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -22,7 +22,7 @@ import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.*;
+import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -63,7 +63,7 @@ public class MetricsAggregateWorker extends
AbstractWorker<Metrics> {
}
@Override public final void in(Metrics metrics) {
- metrics.setEndOfBatchContext(new EndOfBatchContext(false));
+ metrics.resetEndOfBatch();
dataCarrier.produce(metrics);
}
@@ -71,7 +71,7 @@ public class MetricsAggregateWorker extends
AbstractWorker<Metrics> {
aggregationCounter.inc();
aggregate(metrics);
- if (metrics.getEndOfBatchContext().isEndOfBatch()) {
+ if (metrics.isEndOfBatch()) {
sendToNext();
}
}
@@ -127,7 +127,7 @@ public class MetricsAggregateWorker extends
AbstractWorker<Metrics> {
Metrics metrics = inputIterator.next();
i++;
if (i == data.size()) {
- metrics.getEndOfBatchContext().setEndOfBatch(true);
+ metrics.asEndOfBatch();
}
aggregator.onWork(metrics);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 4e819b4..066f70f 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -22,7 +22,7 @@ import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.*;
+import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
@@ -76,7 +76,7 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
}
@Override public void in(Metrics metrics) {
- metrics.setEndOfBatchContext(new EndOfBatchContext(false));
+ metrics.resetEndOfBatch();
dataCarrier.produce(metrics);
}
@@ -193,7 +193,7 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
Metrics metrics = inputIterator.next();
i++;
if (i == data.size()) {
- metrics.getEndOfBatchContext().setEndOfBatch(true);
+ metrics.asEndOfBatch();
}
persistent.onWork(metrics);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index b077b4f..cdb8fb4 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -22,7 +22,6 @@ import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -60,7 +59,7 @@ public class RegisterDistinctWorker extends
AbstractWorker<RegisterSource> {
}
@Override public final void in(RegisterSource source) {
- source.setEndOfBatchContext(new EndOfBatchContext(false));
+ source.resetEndOfBatch();
dataCarrier.produce(source);
}
@@ -73,7 +72,7 @@ public class RegisterDistinctWorker extends
AbstractWorker<RegisterSource> {
sources.get(source).combine(source);
}
- if (messageNum >= 1000 ||
source.getEndOfBatchContext().isEndOfBatch()) {
+ if (messageNum >= 1000 || source.isEndOfBatch()) {
sources.values().forEach(nextWorker::in);
sources.clear();
messageNum = 0;
@@ -99,7 +98,7 @@ public class RegisterDistinctWorker extends
AbstractWorker<RegisterSource> {
RegisterSource source = sourceIterator.next();
i++;
if (i == sources.size()) {
- source.getEndOfBatchContext().setEndOfBatch(true);
+ source.asEndOfBatch();
}
aggregator.onWork(source);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 2fb3101..3a481e9 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -18,27 +18,16 @@
package org.apache.skywalking.oap.server.core.register.worker;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import
org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
-import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -80,7 +69,7 @@ public class RegisterPersistentWorker extends
AbstractWorker<RegisterSource> {
}
@Override public final void in(RegisterSource registerSource) {
- registerSource.setEndOfBatchContext(new EndOfBatchContext(false));
+ registerSource.resetEndOfBatch();
dataCarrier.produce(registerSource);
}
@@ -91,7 +80,7 @@ public class RegisterPersistentWorker extends
AbstractWorker<RegisterSource> {
sources.get(registerSource).combine(registerSource);
}
- if (sources.size() > 1000 ||
registerSource.getEndOfBatchContext().isEndOfBatch()) {
+ if (sources.size() > 1000 || registerSource.isEndOfBatch()) {
sources.values().forEach(source -> {
try {
RegisterSource dbSource = registerDAO.get(modelName,
source.id());
@@ -147,7 +136,7 @@ public class RegisterPersistentWorker extends
AbstractWorker<RegisterSource> {
RegisterSource registerSource = sourceIterator.next();
i++;
if (i == data.size()) {
- registerSource.getEndOfBatchContext().setEndOfBatch(true);
+ registerSource.asEndOfBatch();
}
persistent.onWork(registerSource);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
index 532130f..d2875eb 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.core.remote.data;
-import org.apache.skywalking.oap.server.core.analysis.data.*;
+import org.apache.skywalking.oap.server.core.analysis.data.QueueData;
import org.apache.skywalking.oap.server.core.remote.*;
/**
@@ -26,14 +26,18 @@ import org.apache.skywalking.oap.server.core.remote.*;
*/
public abstract class StreamData implements QueueData, Serializable,
Deserializable {
- private EndOfBatchContext endOfBatchContext;
+ private boolean endOfBatch = false;
- @Override public final EndOfBatchContext getEndOfBatchContext() {
- return this.endOfBatchContext;
+ @Override public void resetEndOfBatch() {
+ this.endOfBatch = false;
}
- @Override public final void setEndOfBatchContext(EndOfBatchContext
context) {
- this.endOfBatchContext = context;
+ @Override public void asEndOfBatch() {
+ this.endOfBatch = true;
+ }
+
+ @Override public boolean isEndOfBatch() {
+ return this.endOfBatch;
}
public abstract int remoteHashCode();
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardization.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardization.java
index 6e23e47..d1e3084 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardization.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardization.java
@@ -19,7 +19,7 @@
package
org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
-import org.apache.skywalking.oap.server.core.analysis.data.*;
+import org.apache.skywalking.oap.server.core.analysis.data.QueueData;
/**
* @author peng-yongsheng
@@ -36,14 +36,18 @@ public class SegmentStandardization implements QueueData {
return id;
}
- private EndOfBatchContext context;
+ private boolean endOfBatch = false;
- @Override public EndOfBatchContext getEndOfBatchContext() {
- return this.context;
+ @Override public void resetEndOfBatch() {
+ this.endOfBatch = false;
}
- @Override public void setEndOfBatchContext(EndOfBatchContext context) {
- this.context = context;
+ @Override public void asEndOfBatch() {
+ this.endOfBatch = true;
+ }
+
+ @Override public boolean isEndOfBatch() {
+ return this.endOfBatch;
}
private UpstreamSegment upstreamSegment;