This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 6de3e55  Too many EndOfBatchContext instance created. (#3129)
6de3e55 is described below

commit 6de3e55280507397fa695e1968a5286de18d8d2b
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Sun Jul 21 00:26:17 2019 +0800

    Too many EndOfBatchContext instance created. (#3129)
---
 .../core/analysis/data/EndOfBatchContext.java      | 39 ----------------------
 .../oap/server/core/analysis/data/QueueData.java   |  6 ++--
 .../analysis/worker/MetricsAggregateWorker.java    |  8 ++---
 .../analysis/worker/MetricsPersistentWorker.java   |  6 ++--
 .../register/worker/RegisterDistinctWorker.java    |  7 ++--
 .../register/worker/RegisterPersistentWorker.java  | 27 +++++----------
 .../oap/server/core/remote/data/StreamData.java    | 16 +++++----
 .../standardization/SegmentStandardization.java    | 16 +++++----
 8 files changed, 42 insertions(+), 83 deletions(-)

diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/EndOfBatchContext.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/EndOfBatchContext.java
deleted file mode 100644
index 528ace9..0000000
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/EndOfBatchContext.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.core.analysis.data;
-
-/**
- * @author peng-yongsheng
- */
-public class EndOfBatchContext {
-
-    private boolean isEndOfBatch;
-
-    public EndOfBatchContext(boolean isEndOfBatch) {
-        this.isEndOfBatch = isEndOfBatch;
-    }
-
-    public boolean isEndOfBatch() {
-        return isEndOfBatch;
-    }
-
-    public void setEndOfBatch(boolean endOfBatch) {
-        isEndOfBatch = endOfBatch;
-    }
-}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java
index 858e62d..494d348 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java
@@ -23,7 +23,9 @@ package org.apache.skywalking.oap.server.core.analysis.data;
  */
 public interface QueueData {
 
-    EndOfBatchContext getEndOfBatchContext();
+    void resetEndOfBatch();
 
-    void setEndOfBatchContext(EndOfBatchContext context);
+    void asEndOfBatch();
+
+    boolean isEndOfBatch();
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index 59c75bb..ff3d0d0 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -22,7 +22,7 @@ import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.*;
+import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -63,7 +63,7 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
     }
 
     @Override public final void in(Metrics metrics) {
-        metrics.setEndOfBatchContext(new EndOfBatchContext(false));
+        metrics.resetEndOfBatch();
         dataCarrier.produce(metrics);
     }
 
@@ -71,7 +71,7 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
         aggregationCounter.inc();
         aggregate(metrics);
 
-        if (metrics.getEndOfBatchContext().isEndOfBatch()) {
+        if (metrics.isEndOfBatch()) {
             sendToNext();
         }
     }
@@ -127,7 +127,7 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
                 Metrics metrics = inputIterator.next();
                 i++;
                 if (i == data.size()) {
-                    metrics.getEndOfBatchContext().setEndOfBatch(true);
+                    metrics.asEndOfBatch();
                 }
                 aggregator.onWork(metrics);
             }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 4e819b4..066f70f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -22,7 +22,7 @@ import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.*;
+import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
@@ -76,7 +76,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics, MergeDat
     }
 
     @Override public void in(Metrics metrics) {
-        metrics.setEndOfBatchContext(new EndOfBatchContext(false));
+        metrics.resetEndOfBatch();
         dataCarrier.produce(metrics);
     }
 
@@ -193,7 +193,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics, MergeDat
                 Metrics metrics = inputIterator.next();
                 i++;
                 if (i == data.size()) {
-                    metrics.getEndOfBatchContext().setEndOfBatch(true);
+                    metrics.asEndOfBatch();
                 }
                 persistent.onWork(metrics);
             }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index b077b4f..cdb8fb4 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -22,7 +22,6 @@ import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -60,7 +59,7 @@ public class RegisterDistinctWorker extends 
AbstractWorker<RegisterSource> {
     }
 
     @Override public final void in(RegisterSource source) {
-        source.setEndOfBatchContext(new EndOfBatchContext(false));
+        source.resetEndOfBatch();
         dataCarrier.produce(source);
     }
 
@@ -73,7 +72,7 @@ public class RegisterDistinctWorker extends 
AbstractWorker<RegisterSource> {
             sources.get(source).combine(source);
         }
 
-        if (messageNum >= 1000 || 
source.getEndOfBatchContext().isEndOfBatch()) {
+        if (messageNum >= 1000 || source.isEndOfBatch()) {
             sources.values().forEach(nextWorker::in);
             sources.clear();
             messageNum = 0;
@@ -99,7 +98,7 @@ public class RegisterDistinctWorker extends 
AbstractWorker<RegisterSource> {
                 RegisterSource source = sourceIterator.next();
                 i++;
                 if (i == sources.size()) {
-                    source.getEndOfBatchContext().setEndOfBatch(true);
+                    source.asEndOfBatch();
                 }
                 aggregator.onWork(source);
             }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 2fb3101..3a481e9 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -18,27 +18,16 @@
 
 package org.apache.skywalking.oap.server.core.register.worker;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import 
org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
-import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
-import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -80,7 +69,7 @@ public class RegisterPersistentWorker extends 
AbstractWorker<RegisterSource> {
     }
 
     @Override public final void in(RegisterSource registerSource) {
-        registerSource.setEndOfBatchContext(new EndOfBatchContext(false));
+        registerSource.resetEndOfBatch();
         dataCarrier.produce(registerSource);
     }
 
@@ -91,7 +80,7 @@ public class RegisterPersistentWorker extends 
AbstractWorker<RegisterSource> {
             sources.get(registerSource).combine(registerSource);
         }
 
-        if (sources.size() > 1000 || 
registerSource.getEndOfBatchContext().isEndOfBatch()) {
+        if (sources.size() > 1000 || registerSource.isEndOfBatch()) {
             sources.values().forEach(source -> {
                 try {
                     RegisterSource dbSource = registerDAO.get(modelName, 
source.id());
@@ -147,7 +136,7 @@ public class RegisterPersistentWorker extends 
AbstractWorker<RegisterSource> {
                 RegisterSource registerSource = sourceIterator.next();
                 i++;
                 if (i == data.size()) {
-                    registerSource.getEndOfBatchContext().setEndOfBatch(true);
+                    registerSource.asEndOfBatch();
                 }
                 persistent.onWork(registerSource);
             }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
index 532130f..d2875eb 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.remote.data;
 
-import org.apache.skywalking.oap.server.core.analysis.data.*;
+import org.apache.skywalking.oap.server.core.analysis.data.QueueData;
 import org.apache.skywalking.oap.server.core.remote.*;
 
 /**
@@ -26,14 +26,18 @@ import org.apache.skywalking.oap.server.core.remote.*;
  */
 public abstract class StreamData implements QueueData, Serializable, 
Deserializable {
 
-    private EndOfBatchContext endOfBatchContext;
+    private boolean endOfBatch = false;
 
-    @Override public final EndOfBatchContext getEndOfBatchContext() {
-        return this.endOfBatchContext;
+    @Override public void resetEndOfBatch() {
+        this.endOfBatch = false;
     }
 
-    @Override public final void setEndOfBatchContext(EndOfBatchContext 
context) {
-        this.endOfBatchContext = context;
+    @Override public void asEndOfBatch() {
+        this.endOfBatch = true;
+    }
+
+    @Override public boolean isEndOfBatch() {
+        return this.endOfBatch;
     }
 
     public abstract int remoteHashCode();
diff --git 
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardization.java
 
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardization.java
index 6e23e47..d1e3084 100644
--- 
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardization.java
+++ 
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardization.java
@@ -19,7 +19,7 @@
 package 
org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization;
 
 import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
-import org.apache.skywalking.oap.server.core.analysis.data.*;
+import org.apache.skywalking.oap.server.core.analysis.data.QueueData;
 
 /**
  * @author peng-yongsheng
@@ -36,14 +36,18 @@ public class SegmentStandardization implements QueueData {
         return id;
     }
 
-    private EndOfBatchContext context;
+    private boolean endOfBatch = false;
 
-    @Override public EndOfBatchContext getEndOfBatchContext() {
-        return this.context;
+    @Override public void resetEndOfBatch() {
+        this.endOfBatch = false;
     }
 
-    @Override public void setEndOfBatchContext(EndOfBatchContext context) {
-        this.context = context;
+    @Override public void asEndOfBatch() {
+        this.endOfBatch = true;
+    }
+
+    @Override public boolean isEndOfBatch() {
+        return this.endOfBatch;
     }
 
     private UpstreamSegment upstreamSegment;

Reply via email to