This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 8306ad4  Reduce the number of threads to improve performance. (#3133)
8306ad4 is described below

commit 8306ad45318ba1679c118658fb3761c62801b33f
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Sun Jul 21 15:51:24 2019 +0800

    Reduce the number of threads to improve performance. (#3133)
    
    * Refactor Persistence worker.
    
    * 1. Provide InsertRequest and UpdateRequest interface for prepare 
persistence.
    2. Implement the ids query for H2 metrics DAO.
    
    * Refactor worker framework
    
    * Use queue to receive asynchronous batch request.
    
    * Rename the Datacarrier thread name.
    
    * Fixed some mistake.
    
    * New mistake.
---
 .../analysis/worker/MetricsPersistentWorker.java   | 45 +++--------
 .../analysis/worker/MetricsStreamProcessor.java    | 27 ++-----
 .../core/analysis/worker/PersistenceWorker.java    | 32 ++------
 .../analysis/worker/RecordPersistentWorker.java    | 86 ++++----------------
 .../analysis/worker/RecordStreamProcessor.java     |  6 +-
 .../server/core/analysis/worker/TopNWorker.java    | 33 +++-----
 .../oap/server/core/storage/IBatchDAO.java         |  5 +-
 .../oap/server/core/storage/IRecordDAO.java        |  5 +-
 .../oap/server/core/storage/PersistenceTimer.java  | 93 +++++-----------------
 .../oap/server/core/worker/AbstractWorker.java     |  1 +
 .../elasticsearch/base/BatchProcessEsDAO.java      | 34 +++-----
 .../plugin/elasticsearch/base/RecordEsDAO.java     |  6 +-
 .../storage/plugin/jdbc/h2/dao/H2BatchDAO.java     | 60 +++++++++++---
 .../storage/plugin/jdbc/h2/dao/H2RecordDAO.java    |  6 +-
 .../storage/plugin/jdbc/h2/dao/H2SQLExecutor.java  | 31 +++-----
 15 files changed, 150 insertions(+), 320 deletions(-)

diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 14135dc..34e6436 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -28,6 +28,7 @@ import 
org.apache.skywalking.oap.server.core.exporter.ExportEvent;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.slf4j.*;
 
@@ -45,10 +46,9 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics, MergeDat
     private final AbstractWorker<ExportEvent> nextExportWorker;
     private final DataCarrier<Metrics> dataCarrier;
 
-    MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model 
model, int batchSize,
-        IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
+    MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model 
model, IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
         AbstractWorker<ExportEvent> nextExportWorker) {
-        super(moduleDefineHolder, batchSize);
+        super(moduleDefineHolder);
         this.model = model;
         this.mergeDataCache = new MergeDataCache<>();
         this.metricsDAO = metricsDAO;
@@ -76,7 +76,6 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics, MergeDat
     }
 
     @Override public void in(Metrics metrics) {
-        metrics.resetEndOfBatch();
         dataCarrier.produce(metrics);
     }
 
@@ -84,23 +83,9 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics, MergeDat
         return mergeDataCache;
     }
 
-    public boolean flushAndSwitch() {
-        boolean isSwitch;
-        try {
-            if (isSwitch = getCache().trySwitchPointer()) {
-                getCache().switchPointer();
-            }
-        } finally {
-            getCache().trySwitchPointerFinally();
-        }
-        return isSwitch;
-    }
-
-    @Override public List<Object> prepareBatch(MergeDataCache<Metrics> cache) {
+    @Override public void prepareBatch(MergeDataCache<Metrics> cache, 
List<PrepareRequest> prepareRequests) {
         long start = System.currentTimeMillis();
 
-        List<Object> batchCollection = new LinkedList<>();
-
         Collection<Metrics> collection = cache.getLast().collection();
 
         int i = 0;
@@ -131,9 +116,9 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics, MergeDat
                         if (dbMetricsMap.containsKey(metric.id())) {
                             metric.combine(dbMetricsMap.get(metric.id()));
                             metric.calculate();
-                            
batchCollection.add(metricsDAO.prepareBatchUpdate(model, metric));
+                            
prepareRequests.add(metricsDAO.prepareBatchUpdate(model, metric));
                         } else {
-                            
batchCollection.add(metricsDAO.prepareBatchInsert(model, metric));
+                            
prepareRequests.add(metricsDAO.prepareBatchInsert(model, metric));
                         }
 
                         if (Objects.nonNull(nextAlarmWorker)) {
@@ -152,11 +137,9 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics, MergeDat
             i++;
         }
 
-        if (batchCollection.size() > 0) {
-            logger.debug("prepareBatch model {}, took time: {}", 
model.getName(), System.currentTimeMillis() - start);
+        if (prepareRequests.size() > 0) {
+            logger.debug("prepare batch requests for model {}, took time: {}", 
model.getName(), System.currentTimeMillis() - start);
         }
-
-        return batchCollection;
     }
 
     @Override public void cacheData(Metrics input) {
@@ -186,17 +169,7 @@ public class MetricsPersistentWorker extends 
PersistenceWorker<Metrics, MergeDat
         }
 
         @Override public void consume(List<Metrics> data) {
-            Iterator<Metrics> inputIterator = data.iterator();
-
-            int i = 0;
-            while (inputIterator.hasNext()) {
-                Metrics metrics = inputIterator.next();
-                i++;
-                if (i == data.size()) {
-                    metrics.asEndOfBatch();
-                }
-                persistent.onWork(metrics);
-            }
+            data.forEach(persistent::onWork);
         }
 
         @Override public void onError(List<Metrics> data, Throwable t) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 78d0398..e81906e 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -18,25 +18,15 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import lombok.Getter;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
-import org.apache.skywalking.oap.server.core.analysis.Downsampling;
-import org.apache.skywalking.oap.server.core.analysis.Stream;
-import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.analysis.*;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.*;
 import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 
@@ -61,6 +51,7 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
         }
     }
 
+    @SuppressWarnings("unchecked")
     public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, 
Class<? extends Metrics> metricsClass) {
         if (DisableRegister.INSTANCE.include(stream.name())) {
             return;
@@ -114,16 +105,14 @@ public class MetricsStreamProcessor implements 
StreamProcessor<Metrics> {
         AlarmNotifyWorker alarmNotifyWorker = new 
AlarmNotifyWorker(moduleDefineHolder);
         ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
 
-        MetricsPersistentWorker minutePersistentWorker = new 
MetricsPersistentWorker(moduleDefineHolder, model,
-            1000, metricsDAO, alarmNotifyWorker, exportWorker);
+        MetricsPersistentWorker minutePersistentWorker = new 
MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, 
alarmNotifyWorker, exportWorker);
         persistentWorkers.add(minutePersistentWorker);
 
         return minutePersistentWorker;
     }
 
     private MetricsPersistentWorker worker(ModuleDefineHolder 
moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
-        MetricsPersistentWorker persistentWorker = new 
MetricsPersistentWorker(moduleDefineHolder, model,
-            1000, metricsDAO, null, null);
+        MetricsPersistentWorker persistentWorker = new 
MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null);
         persistentWorkers.add(persistentWorker);
 
         return persistentWorker;
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index e8ac608..166eb9c 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -18,10 +18,11 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import java.util.*;
+import java.util.List;
 import org.apache.skywalking.oap.server.core.analysis.data.Window;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.slf4j.*;
 
@@ -32,28 +33,11 @@ public abstract class PersistenceWorker<INPUT extends 
StorageData, CACHE extends
 
     private static final Logger logger = 
LoggerFactory.getLogger(PersistenceWorker.class);
 
-    private final int batchSize;
-    private final IBatchDAO batchDAO;
-
-    PersistenceWorker(ModuleDefineHolder moduleDefineHolder, int batchSize) {
+    PersistenceWorker(ModuleDefineHolder moduleDefineHolder) {
         super(moduleDefineHolder);
-        this.batchSize = batchSize;
-        this.batchDAO = 
moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
     }
 
     void onWork(INPUT input) {
-        if (getCache().currentCollectionSize() >= batchSize) {
-            try {
-                if (getCache().trySwitchPointer()) {
-                    getCache().switchPointer();
-
-                    List<?> collection = buildBatchCollection();
-                    batchDAO.asynchronous(collection);
-                }
-            } finally {
-                getCache().trySwitchPointerFinally();
-            }
-        }
         cacheData(input);
     }
 
@@ -73,10 +57,9 @@ public abstract class PersistenceWorker<INPUT extends 
StorageData, CACHE extends
         return isSwitch;
     }
 
-    public abstract List<Object> prepareBatch(CACHE cache);
+    public abstract void prepareBatch(CACHE cache, List<PrepareRequest> 
prepareRequests);
 
-    public final List<?> buildBatchCollection() {
-        List<?> batchCollection = new LinkedList<>();
+    public final void buildBatchRequests(List<PrepareRequest> prepareRequests) 
{
         try {
             while (getCache().getLast().isWriting()) {
                 try {
@@ -87,11 +70,10 @@ public abstract class PersistenceWorker<INPUT extends 
StorageData, CACHE extends
             }
 
             if (getCache().getLast().collection() != null) {
-                batchCollection = prepareBatch(getCache());
+                prepareBatch(getCache(), prepareRequests);
             }
         } finally {
             getCache().finishReadingLast();
         }
-        return batchCollection;
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
index 16a0a3c..f77de2b 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
@@ -18,97 +18,39 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import java.util.*;
-import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache;
+import java.io.IOException;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
-public class RecordPersistentWorker extends PersistenceWorker<Record, 
NonMergeDataCache<Record>> {
+public class RecordPersistentWorker extends AbstractWorker<Record> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(RecordPersistentWorker.class);
 
     private final Model model;
-    private final NonMergeDataCache<Record> nonMergeDataCache;
     private final IRecordDAO recordDAO;
-    private final DataCarrier<Record> dataCarrier;
+    private final IBatchDAO batchDAO;
 
-    RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, 
int batchSize,
-        IRecordDAO recordDAO) {
-        super(moduleDefineHolder, batchSize);
+    RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, 
IRecordDAO recordDAO) {
+        super(moduleDefineHolder);
         this.model = model;
-        this.nonMergeDataCache = new NonMergeDataCache<>();
         this.recordDAO = recordDAO;
-
-        String name = "RECORD_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 
20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
-        }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new 
RecordPersistentWorker.PersistentConsumer(this));
+        this.batchDAO = 
moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
     }
 
     @Override public void in(Record record) {
-        dataCarrier.produce(record);
-    }
-
-    @Override public NonMergeDataCache<Record> getCache() {
-        return nonMergeDataCache;
-    }
-
-    @Override public List<Object> prepareBatch(NonMergeDataCache<Record> 
cache) {
-        List<Object> batchCollection = new LinkedList<>();
-        cache.getLast().collection().forEach(record -> {
-            try {
-                batchCollection.add(recordDAO.prepareBatchInsert(model, 
record));
-            } catch (Throwable t) {
-                logger.error(t.getMessage(), t);
-            }
-        });
-        return batchCollection;
-    }
-
-    @Override public void cacheData(Record input) {
-        nonMergeDataCache.writing();
-        nonMergeDataCache.add(input);
-        nonMergeDataCache.finishWriting();
-    }
-
-    private class PersistentConsumer implements IConsumer<Record> {
-
-        private final RecordPersistentWorker persistent;
-
-        private PersistentConsumer(RecordPersistentWorker persistent) {
-            this.persistent = persistent;
-        }
-
-        @Override public void init() {
-
-        }
-
-        @Override public void consume(List<Record> data) {
-            for (Record record : data) {
-                persistent.onWork(record);
-            }
-        }
-
-        @Override public void onError(List<Record> data, Throwable t) {
-            logger.error(t.getMessage(), t);
-        }
-
-        @Override public void onExit() {
+        try {
+            InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, 
record);
+            batchDAO.asynchronous(insertRequest);
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
         }
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
index b84d99b..3397f75 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
@@ -19,7 +19,6 @@
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.*;
-import lombok.Getter;
 import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.*;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
@@ -48,8 +47,6 @@ public class RecordStreamProcessor implements 
StreamProcessor<Record> {
         }
     }
 
-    @Getter private List<RecordPersistentWorker> persistentWorkers = new 
ArrayList<>();
-
     @SuppressWarnings("unchecked")
     public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, 
Class<? extends Record> recordClass) {
         if (DisableRegister.INSTANCE.include(stream.name())) {
@@ -66,9 +63,8 @@ public class RecordStreamProcessor implements 
StreamProcessor<Record> {
 
         IModelSetter modelSetter = 
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
         Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), 
new Storage(stream.name(), true, true, Downsampling.Second), true);
-        RecordPersistentWorker persistentWorker = new 
RecordPersistentWorker(moduleDefineHolder, model, 4000, recordDAO);
+        RecordPersistentWorker persistentWorker = new 
RecordPersistentWorker(moduleDefineHolder, model, recordDAO);
 
-        persistentWorkers.add(persistentWorker);
         workers.put(recordClass, persistentWorker);
     }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 37a8265..0287f07 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -18,20 +18,21 @@
 
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
-import java.util.*;
+import java.util.List;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import 
org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
 import org.slf4j.*;
 
 /**
  * Top N worker is a persistence worker, but no
  *
- * @author wusheng
+ * @author wusheng, peng-yongsheng
  */
 public class TopNWorker extends PersistenceWorker<TopN, 
LimitedSizeDataCache<TopN>> {
 
@@ -44,9 +45,9 @@ public class TopNWorker extends PersistenceWorker<TopN, 
LimitedSizeDataCache<Top
     private long reportCycle;
     private volatile long lastReportTimestamp;
 
-    public TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model,
+    TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model,
         int topNSize, IRecordDAO recordDAO) {
-        super(moduleDefineHolder, -1);
+        super(moduleDefineHolder);
         this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
         this.recordDAO = recordDAO;
         this.model = model;
@@ -57,7 +58,7 @@ public class TopNWorker extends PersistenceWorker<TopN, 
LimitedSizeDataCache<Top
         this.reportCycle = 10 * 60 * 1000L;
     }
 
-    @Override void onWork(TopN data) {
+    @Override public void cacheData(TopN data) {
         limitedSizeDataCache.writing();
         try {
             limitedSizeDataCache.add(data);
@@ -66,15 +67,6 @@ public class TopNWorker extends PersistenceWorker<TopN, 
LimitedSizeDataCache<Top
         }
     }
 
-    /**
-     * TopN is not following the batch size trigger mode. The memory cost of 
this worker is limited always.
-     *
-     * `onWork` method has been override, so this method would never be 
executed. No need to implement this method,
-     */
-    @Override public void cacheData(TopN data) {
-
-    }
-
     @Override public LimitedSizeDataCache<TopN> getCache() {
         return limitedSizeDataCache;
     }
@@ -84,8 +76,6 @@ public class TopNWorker extends PersistenceWorker<TopN, 
LimitedSizeDataCache<Top
      * time windows.
      *
      * Switch and persistent attempt happens based on reportCycle.
-     *
-     * @return
      */
     @Override public boolean flushAndSwitch() {
         long now = System.currentTimeMillis();
@@ -96,16 +86,14 @@ public class TopNWorker extends PersistenceWorker<TopN, 
LimitedSizeDataCache<Top
         return super.flushAndSwitch();
     }
 
-    @Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN> 
cache) {
-        List<Object> batchCollection = new LinkedList<>();
+    @Override public void prepareBatch(LimitedSizeDataCache<TopN> cache, 
List<PrepareRequest> prepareRequests) {
         cache.getLast().collection().forEach(record -> {
             try {
-                batchCollection.add(recordDAO.prepareBatchInsert(model, 
record));
+                prepareRequests.add(recordDAO.prepareBatchInsert(model, 
record));
             } catch (Throwable t) {
                 logger.error(t.getMessage(), t);
             }
         });
-        return batchCollection;
     }
 
     @Override public void in(TopN n) {
@@ -113,16 +101,17 @@ public class TopNWorker extends PersistenceWorker<TopN, 
LimitedSizeDataCache<Top
     }
 
     private class TopNConsumer implements IConsumer<TopN> {
+
         @Override public void init() {
 
         }
 
         @Override public void consume(List<TopN> data) {
-            /**
+            /*
              * TopN is not following the batch size trigger mode.
              * No need to implement this method, the memory size is limited 
always.
              */
-            data.forEach(row -> onWork(row));
+            data.forEach(TopNWorker.this::onWork);
         }
 
         @Override public void onError(List<TopN> data, Throwable t) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
index 2bd3a56..3c1ed0c 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
@@ -19,13 +19,14 @@
 package org.apache.skywalking.oap.server.core.storage;
 
 import java.util.List;
+import org.apache.skywalking.oap.server.library.client.request.*;
 
 /**
  * @author peng-yongsheng
  */
 public interface IBatchDAO extends DAO {
 
-    void asynchronous(List<?> collection);
+    void asynchronous(InsertRequest insertRequest);
 
-    void synchronous(List<?> collection);
+    void synchronous(List<PrepareRequest> prepareRequests);
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
index 312b89f..e8a0724 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
@@ -21,11 +21,12 @@ package org.apache.skywalking.oap.server.core.storage;
 import java.io.IOException;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 
 /**
  * @author peng-yongsheng
  */
-public interface IRecordDAO<INSERT> extends DAO {
+public interface IRecordDAO extends DAO {
 
-    INSERT prepareBatchInsert(Model model, Record record) throws IOException;
+    InsertRequest prepareBatchInsert(Model model, Record record) throws 
IOException;
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index ffe3480..cedd074 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.*;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.oap.server.core.CoreModuleConfig;
 import org.apache.skywalking.oap.server.core.analysis.worker.*;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
@@ -61,79 +62,40 @@ public enum PersistenceTimer {
 
         if (!isStarted) {
             Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
-                new RunnableWithExceptionProtection(() -> 
extractDataAndSaveRecord(batchDAO),
-                    t -> logger.error("Extract data and save record failure.", 
t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
-
-            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
-                new RunnableWithExceptionProtection(() -> 
extractDataAndSaveMetrics(batchDAO),
-                    t -> logger.error("Extract data and save metrics 
failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
+                new RunnableWithExceptionProtection(() -> 
extractDataAndSave(batchDAO),
+                    t -> logger.error("Extract data and save failure.", t)), 
5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
 
             this.isStarted = true;
         }
     }
 
-    private void extractDataAndSaveRecord(IBatchDAO batchDAO) {
+    private void extractDataAndSave(IBatchDAO batchDAO) {
         if (logger.isDebugEnabled()) {
-            logger.debug("Extract data and save record");
+            logger.debug("Extract data and save");
         }
 
         long startTime = System.currentTimeMillis();
         try {
             HistogramMetrics.Timer timer = prepareLatency.createTimer();
 
-            List records = new LinkedList();
+            List<PrepareRequest> prepareRequests = new LinkedList<>();
             try {
                 List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
-                
persistenceWorkers.addAll(RecordStreamProcessor.getInstance().getPersistentWorkers());
                 
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
+                
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
 
-                buildBatchCollection(persistenceWorkers, records);
-
-                if (debug) {
-                    logger.info("build batch persistence duration: {} ms", 
System.currentTimeMillis() - startTime);
-                }
-            } finally {
-                timer.finish();
-            }
+                persistenceWorkers.forEach(worker -> {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("extract {} worker data and save", 
worker.getClass().getName());
+                    }
 
-            HistogramMetrics.Timer executeLatencyTimer = 
executeLatency.createTimer();
-            try {
-                if (CollectionUtils.isNotEmpty(records)) {
-                    batchDAO.asynchronous(records);
-                }
-            } finally {
-                executeLatencyTimer.finish();
-            }
-        } catch (Throwable e) {
-            errorCounter.inc();
-            logger.error(e.getMessage(), e);
-        } finally {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Persistence data save finish");
-            }
-        }
-
-        if (debug) {
-            logger.info("Batch persistence duration: {} ms", 
System.currentTimeMillis() - startTime);
-        }
-    }
-
-    private void extractDataAndSaveMetrics(IBatchDAO batchDAO) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Extract data and save metrics");
-        }
-
-        long startTime = System.currentTimeMillis();
-        try {
-            HistogramMetrics.Timer timer = prepareLatency.createTimer();
-
-            List metrics = new LinkedList();
-            try {
-                List<PersistenceWorker> persistenceWorkers = new 
ArrayList<>(MetricsStreamProcessor.getInstance().getPersistentWorkers());
-                buildBatchCollection(persistenceWorkers, metrics);
+                    if (worker.flushAndSwitch()) {
+                        worker.buildBatchRequests(prepareRequests);
+                    }
+                });
 
                 if (debug) {
-                    logger.info("build metrics batch persistence duration: {} 
ms", System.currentTimeMillis() - startTime);
+                    logger.info("build batch persistence duration: {} ms", 
System.currentTimeMillis() - startTime);
                 }
             } finally {
                 timer.finish();
@@ -141,8 +103,8 @@ public enum PersistenceTimer {
 
             HistogramMetrics.Timer executeLatencyTimer = 
executeLatency.createTimer();
             try {
-                if (CollectionUtils.isNotEmpty(metrics)) {
-                    batchDAO.synchronous(metrics);
+                if (CollectionUtils.isNotEmpty(prepareRequests)) {
+                    batchDAO.synchronous(prepareRequests);
                 }
             } finally {
                 executeLatencyTimer.finish();
@@ -160,23 +122,4 @@ public enum PersistenceTimer {
             logger.info("Batch persistence duration: {} ms", 
System.currentTimeMillis() - startTime);
         }
     }
-
-    @SuppressWarnings("unchecked")
-    private void buildBatchCollection(List<PersistenceWorker> 
persistenceWorkers, List collection) {
-        persistenceWorkers.forEach(worker -> {
-            if (logger.isDebugEnabled()) {
-                logger.debug("extract {} worker data and save", 
worker.getClass().getName());
-            }
-
-            if (worker.flushAndSwitch()) {
-                List<?> batchCollection = worker.buildBatchCollection();
-
-                if (logger.isDebugEnabled()) {
-                    logger.debug("extract {} worker data size: {}", 
worker.getClass().getName(), batchCollection.size());
-                }
-
-                collection.addAll(batchCollection);
-            }
-        });
-    }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
index b513d38..35cc466 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
@@ -25,6 +25,7 @@ import 
org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
  * @author peng-yongsheng
  */
 public abstract class AbstractWorker<INPUT> {
+    
     @Getter private final ModuleDefineHolder moduleDefineHolder;
 
     public AbstractWorker(ModuleDefineHolder moduleDefineHolder) {
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index d4d118d..da8f4e4 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -21,6 +21,7 @@ package 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 import java.util.List;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.client.request.*;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.elasticsearch.action.bulk.*;
 import org.elasticsearch.action.index.IndexRequest;
@@ -47,38 +48,23 @@ public class BatchProcessEsDAO extends EsDAO implements 
IBatchDAO {
         this.concurrentRequests = concurrentRequests;
     }
 
-    @Override public void asynchronous(List<?> collection) {
+    @Override public void asynchronous(InsertRequest insertRequest) {
         if (bulkProcessor == null) {
             this.bulkProcessor = getClient().createBulkProcessor(bulkActions, 
flushInterval, concurrentRequests);
         }
 
-        if (logger.isDebugEnabled()) {
-            logger.debug("Asynchronous batch persistent data collection size: 
{}", collection.size());
-        }
-
-        if (CollectionUtils.isNotEmpty(collection)) {
-            collection.forEach(builder -> {
-                if (builder instanceof IndexRequest) {
-                    this.bulkProcessor.add((IndexRequest)builder);
-                }
-                if (builder instanceof UpdateRequest) {
-                    this.bulkProcessor.add((UpdateRequest)builder);
-                }
-            });
-            this.bulkProcessor.flush();
-        }
+        this.bulkProcessor.add((IndexRequest)insertRequest);
     }
 
-    @Override public void synchronous(List<?> collection) {
-        if (CollectionUtils.isNotEmpty(collection)) {
+    @Override public void synchronous(List<PrepareRequest> prepareRequests) {
+        if (CollectionUtils.isNotEmpty(prepareRequests)) {
             BulkRequest request = new BulkRequest();
 
-            for (Object builder : collection) {
-                if (builder instanceof IndexRequest) {
-                    request.add((IndexRequest)builder);
-                }
-                if (builder instanceof UpdateRequest) {
-                    request.add((UpdateRequest)builder);
+            for (PrepareRequest prepareRequest : prepareRequests) {
+                if (prepareRequest instanceof InsertRequest) {
+                    request.add((IndexRequest)prepareRequest);
+                } else {
+                    request.add((UpdateRequest)prepareRequest);
                 }
             }
             getClient().synchronousBulk(request);
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
index 05bb6d2..2e7a864 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
@@ -23,13 +23,13 @@ import 
org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
-import org.elasticsearch.action.index.IndexRequest;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
 /**
  * @author peng-yongsheng
  */
-public class RecordEsDAO extends EsDAO implements IRecordDAO<IndexRequest> {
+public class RecordEsDAO extends EsDAO implements IRecordDAO {
 
     private final StorageBuilder<Record> storageBuilder;
 
@@ -38,7 +38,7 @@ public class RecordEsDAO extends EsDAO implements 
IRecordDAO<IndexRequest> {
         this.storageBuilder = storageBuilder;
     }
 
-    @Override public IndexRequest prepareBatchInsert(Model model, Record 
record) throws IOException {
+    @Override public InsertRequest prepareBatchInsert(Model model, Record 
record) throws IOException {
         XContentBuilder builder = map2builder(storageBuilder.data2Map(record));
         String modelName = TimeSeriesUtils.timeSeries(model, 
record.getTimeBucket());
         return getClient().prepareInsert(modelName, record.id(), builder);
diff --git 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
index bfc8148..279a6a8 100644
--- 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
@@ -18,16 +18,18 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
 
-import java.sql.Connection;
-import java.sql.SQLException;
+import java.sql.*;
 import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import 
org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import 
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.library.client.request.*;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author wusheng, peng-yongsheng
@@ -37,24 +39,36 @@ public class H2BatchDAO implements IBatchDAO {
     private static final Logger logger = 
LoggerFactory.getLogger(H2BatchDAO.class);
 
     private JDBCHikariCPClient h2Client;
+    private final DataCarrier<PrepareRequest> dataCarrier;
 
     public H2BatchDAO(JDBCHikariCPClient h2Client) {
         this.h2Client = h2Client;
+
+        String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 
20);
+        try {
+            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+        } catch (Exception e) {
+            throw new UnexpectedException(e.getMessage(), e);
+        }
+
+        this.dataCarrier = new DataCarrier<>(1, 10000);
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new 
H2BatchDAO.H2BatchConsumer(this));
     }
 
-    @Override public void synchronous(List<?> collection) {
-        if (CollectionUtils.isEmpty(collection)) {
+    @Override public void synchronous(List<PrepareRequest> prepareRequests) {
+        if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
 
         if (logger.isDebugEnabled()) {
-            logger.debug("batch sql statements execute, data size: {}", 
collection.size());
+            logger.debug("batch sql statements execute, data size: {}", 
prepareRequests.size());
         }
 
         try (Connection connection = h2Client.getConnection()) {
-            for (Object exe : collection) {
+            for (PrepareRequest prepareRequest : prepareRequests) {
                 try {
-                    SQLExecutor sqlExecutor = (SQLExecutor)exe;
+                    SQLExecutor sqlExecutor = (SQLExecutor)prepareRequest;
                     sqlExecutor.invoke(connection);
                 } catch (SQLException e) {
                     // Just avoid one execution failure makes the rest of 
batch failure.
@@ -66,7 +80,31 @@ public class H2BatchDAO implements IBatchDAO {
         }
     }
 
-    @Override public void asynchronous(List<?> collection) {
-        synchronous(collection);
+    @Override public void asynchronous(InsertRequest insertRequest) {
+        this.dataCarrier.produce(insertRequest);
+    }
+
+    private class H2BatchConsumer implements IConsumer<PrepareRequest> {
+
+        private final H2BatchDAO h2BatchDAO;
+
+        private H2BatchConsumer(H2BatchDAO h2BatchDAO) {
+            this.h2BatchDAO = h2BatchDAO;
+        }
+
+        @Override public void init() {
+
+        }
+
+        @Override public void consume(List<PrepareRequest> prepareRequests) {
+            h2BatchDAO.synchronous(prepareRequests);
+        }
+
+        @Override public void onError(List<PrepareRequest> prepareRequests, 
Throwable t) {
+            logger.error(t.getMessage(), t);
+        }
+
+        @Override public void onExit() {
+        }
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
index aa4c375..39158b4 100644
--- 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
@@ -23,12 +23,12 @@ import 
org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import 
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 
 /**
  * @author wusheng
  */
-public class H2RecordDAO extends H2SQLExecutor implements 
IRecordDAO<SQLExecutor> {
+public class H2RecordDAO extends H2SQLExecutor implements IRecordDAO {
 
     private JDBCHikariCPClient h2Client;
     private StorageBuilder<Record> storageBuilder;
@@ -38,7 +38,7 @@ public class H2RecordDAO extends H2SQLExecutor implements 
IRecordDAO<SQLExecutor
         this.storageBuilder = storageBuilder;
     }
 
-    @Override public SQLExecutor prepareBatchInsert(Model model, Record 
record) throws IOException {
+    @Override public InsertRequest prepareBatchInsert(Model model, Record 
record) throws IOException {
         return getInsertExecutor(model.getName(), record, storageBuilder);
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
index 142770d..ff86eb2 100644
--- 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
+++ 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
@@ -19,32 +19,23 @@
 package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.sql.*;
+import java.util.*;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
 import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
 import 
org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import 
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.ArrayParamBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.*;
+import org.slf4j.*;
 
 /**
  * @author wusheng, peng-yongsheng
  */
 public class H2SQLExecutor {
+    
     private static final Logger logger = 
LoggerFactory.getLogger(H2SQLExecutor.class);
 
     protected List<StorageData> getByIDs(JDBCHikariCPClient h2Client, String 
modelName, String[] ids,
@@ -67,6 +58,7 @@ public class H2SQLExecutor {
                     }
                 }
                 while (storageData != null);
+
                 return storageDataList;
             }
         } catch (SQLException | JDBCClientException e) {
@@ -96,8 +88,7 @@ public class H2SQLExecutor {
         }
     }
 
-    protected StorageData toStorageData(ResultSet rs, String modelName,
-        StorageBuilder storageBuilder) throws SQLException {
+    protected StorageData toStorageData(ResultSet rs, String modelName, 
StorageBuilder storageBuilder) throws SQLException {
         if (rs.next()) {
             Map data = new HashMap();
             List<ModelColumn> columns = 
TableMetaInfo.get(modelName).getColumns();
@@ -122,8 +113,7 @@ public class H2SQLExecutor {
         return Const.NONE;
     }
 
-    protected SQLExecutor getInsertExecutor(String modelName, StorageData 
metrics,
-        StorageBuilder storageBuilder) throws IOException {
+    protected SQLExecutor getInsertExecutor(String modelName, StorageData 
metrics, StorageBuilder storageBuilder) throws IOException {
         Map<String, Object> objectMap = storageBuilder.data2Map(metrics);
 
         SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + modelName + " 
VALUES");
@@ -150,8 +140,7 @@ public class H2SQLExecutor {
         return new SQLExecutor(sqlBuilder.toString(), param);
     }
 
-    protected SQLExecutor getUpdateExecutor(String modelName, StorageData 
metrics,
-        StorageBuilder storageBuilder) throws IOException {
+    protected SQLExecutor getUpdateExecutor(String modelName, StorageData 
metrics, StorageBuilder storageBuilder) throws IOException {
         Map<String, Object> objectMap = storageBuilder.data2Map(metrics);
 
         SQLBuilder sqlBuilder = new SQLBuilder("UPDATE " + modelName + " SET 
");

Reply via email to