This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 8306ad4 Reduce the number of threads to improve performance. (#3133)
8306ad4 is described below
commit 8306ad45318ba1679c118658fb3761c62801b33f
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Sun Jul 21 15:51:24 2019 +0800
Reduce the number of threads to improve performance. (#3133)
* Refactor Persistence worker.
* 1. Provide InsertRequest and UpdateRequest interface for prepare
persistence.
2. Implement the ids query for H2 metrics DAO.
* Refactor worker framework
* Use queue to receive asynchronous batch request.
* Rename the Datacarrier thread name.
* Fixed some mistake.
* New mistake.
---
.../analysis/worker/MetricsPersistentWorker.java | 45 +++--------
.../analysis/worker/MetricsStreamProcessor.java | 27 ++-----
.../core/analysis/worker/PersistenceWorker.java | 32 ++------
.../analysis/worker/RecordPersistentWorker.java | 86 ++++----------------
.../analysis/worker/RecordStreamProcessor.java | 6 +-
.../server/core/analysis/worker/TopNWorker.java | 33 +++-----
.../oap/server/core/storage/IBatchDAO.java | 5 +-
.../oap/server/core/storage/IRecordDAO.java | 5 +-
.../oap/server/core/storage/PersistenceTimer.java | 93 +++++-----------------
.../oap/server/core/worker/AbstractWorker.java | 1 +
.../elasticsearch/base/BatchProcessEsDAO.java | 34 +++-----
.../plugin/elasticsearch/base/RecordEsDAO.java | 6 +-
.../storage/plugin/jdbc/h2/dao/H2BatchDAO.java | 60 +++++++++++---
.../storage/plugin/jdbc/h2/dao/H2RecordDAO.java | 6 +-
.../storage/plugin/jdbc/h2/dao/H2SQLExecutor.java | 31 +++-----
15 files changed, 150 insertions(+), 320 deletions(-)
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 14135dc..34e6436 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -28,6 +28,7 @@ import
org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
@@ -45,10 +46,9 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
private final AbstractWorker<ExportEvent> nextExportWorker;
private final DataCarrier<Metrics> dataCarrier;
- MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model
model, int batchSize,
- IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
+ MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model
model, IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
AbstractWorker<ExportEvent> nextExportWorker) {
- super(moduleDefineHolder, batchSize);
+ super(moduleDefineHolder);
this.model = model;
this.mergeDataCache = new MergeDataCache<>();
this.metricsDAO = metricsDAO;
@@ -76,7 +76,6 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
}
@Override public void in(Metrics metrics) {
- metrics.resetEndOfBatch();
dataCarrier.produce(metrics);
}
@@ -84,23 +83,9 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
return mergeDataCache;
}
- public boolean flushAndSwitch() {
- boolean isSwitch;
- try {
- if (isSwitch = getCache().trySwitchPointer()) {
- getCache().switchPointer();
- }
- } finally {
- getCache().trySwitchPointerFinally();
- }
- return isSwitch;
- }
-
- @Override public List<Object> prepareBatch(MergeDataCache<Metrics> cache) {
+ @Override public void prepareBatch(MergeDataCache<Metrics> cache,
List<PrepareRequest> prepareRequests) {
long start = System.currentTimeMillis();
- List<Object> batchCollection = new LinkedList<>();
-
Collection<Metrics> collection = cache.getLast().collection();
int i = 0;
@@ -131,9 +116,9 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
if (dbMetricsMap.containsKey(metric.id())) {
metric.combine(dbMetricsMap.get(metric.id()));
metric.calculate();
-
batchCollection.add(metricsDAO.prepareBatchUpdate(model, metric));
+
prepareRequests.add(metricsDAO.prepareBatchUpdate(model, metric));
} else {
-
batchCollection.add(metricsDAO.prepareBatchInsert(model, metric));
+
prepareRequests.add(metricsDAO.prepareBatchInsert(model, metric));
}
if (Objects.nonNull(nextAlarmWorker)) {
@@ -152,11 +137,9 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
i++;
}
- if (batchCollection.size() > 0) {
- logger.debug("prepareBatch model {}, took time: {}",
model.getName(), System.currentTimeMillis() - start);
+ if (prepareRequests.size() > 0) {
+ logger.debug("prepare batch requests for model {}, took time: {}",
model.getName(), System.currentTimeMillis() - start);
}
-
- return batchCollection;
}
@Override public void cacheData(Metrics input) {
@@ -186,17 +169,7 @@ public class MetricsPersistentWorker extends
PersistenceWorker<Metrics, MergeDat
}
@Override public void consume(List<Metrics> data) {
- Iterator<Metrics> inputIterator = data.iterator();
-
- int i = 0;
- while (inputIterator.hasNext()) {
- Metrics metrics = inputIterator.next();
- i++;
- if (i == data.size()) {
- metrics.asEndOfBatch();
- }
- persistent.onWork(metrics);
- }
+ data.forEach(persistent::onWork);
}
@Override public void onError(List<Metrics> data, Throwable t) {
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 78d0398..e81906e 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -18,25 +18,15 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import lombok.Getter;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
-import org.apache.skywalking.oap.server.core.analysis.Downsampling;
-import org.apache.skywalking.oap.server.core.analysis.Stream;
-import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
+import org.apache.skywalking.oap.server.core.*;
+import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
-import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
-import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -61,6 +51,7 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
}
}
+ @SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
Class<? extends Metrics> metricsClass) {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
@@ -114,16 +105,14 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
AlarmNotifyWorker alarmNotifyWorker = new
AlarmNotifyWorker(moduleDefineHolder);
ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
- MetricsPersistentWorker minutePersistentWorker = new
MetricsPersistentWorker(moduleDefineHolder, model,
- 1000, metricsDAO, alarmNotifyWorker, exportWorker);
+ MetricsPersistentWorker minutePersistentWorker = new
MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO,
alarmNotifyWorker, exportWorker);
persistentWorkers.add(minutePersistentWorker);
return minutePersistentWorker;
}
private MetricsPersistentWorker worker(ModuleDefineHolder
moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
- MetricsPersistentWorker persistentWorker = new
MetricsPersistentWorker(moduleDefineHolder, model,
- 1000, metricsDAO, null, null);
+ MetricsPersistentWorker persistentWorker = new
MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null);
persistentWorkers.add(persistentWorker);
return persistentWorker;
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index e8ac608..166eb9c 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -18,10 +18,11 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
-import java.util.*;
+import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.data.Window;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
@@ -32,28 +33,11 @@ public abstract class PersistenceWorker<INPUT extends
StorageData, CACHE extends
private static final Logger logger =
LoggerFactory.getLogger(PersistenceWorker.class);
- private final int batchSize;
- private final IBatchDAO batchDAO;
-
- PersistenceWorker(ModuleDefineHolder moduleDefineHolder, int batchSize) {
+ PersistenceWorker(ModuleDefineHolder moduleDefineHolder) {
super(moduleDefineHolder);
- this.batchSize = batchSize;
- this.batchDAO =
moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
}
void onWork(INPUT input) {
- if (getCache().currentCollectionSize() >= batchSize) {
- try {
- if (getCache().trySwitchPointer()) {
- getCache().switchPointer();
-
- List<?> collection = buildBatchCollection();
- batchDAO.asynchronous(collection);
- }
- } finally {
- getCache().trySwitchPointerFinally();
- }
- }
cacheData(input);
}
@@ -73,10 +57,9 @@ public abstract class PersistenceWorker<INPUT extends
StorageData, CACHE extends
return isSwitch;
}
- public abstract List<Object> prepareBatch(CACHE cache);
+ public abstract void prepareBatch(CACHE cache, List<PrepareRequest>
prepareRequests);
- public final List<?> buildBatchCollection() {
- List<?> batchCollection = new LinkedList<>();
+ public final void buildBatchRequests(List<PrepareRequest> prepareRequests)
{
try {
while (getCache().getLast().isWriting()) {
try {
@@ -87,11 +70,10 @@ public abstract class PersistenceWorker<INPUT extends
StorageData, CACHE extends
}
if (getCache().getLast().collection() != null) {
- batchCollection = prepareBatch(getCache());
+ prepareBatch(getCache(), prepareRequests);
}
} finally {
getCache().finishReadingLast();
}
- return batchCollection;
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
index 16a0a3c..f77de2b 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
@@ -18,97 +18,39 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
-import java.util.*;
-import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache;
+import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
-public class RecordPersistentWorker extends PersistenceWorker<Record,
NonMergeDataCache<Record>> {
+public class RecordPersistentWorker extends AbstractWorker<Record> {
private static final Logger logger =
LoggerFactory.getLogger(RecordPersistentWorker.class);
private final Model model;
- private final NonMergeDataCache<Record> nonMergeDataCache;
private final IRecordDAO recordDAO;
- private final DataCarrier<Record> dataCarrier;
+ private final IBatchDAO batchDAO;
- RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model,
int batchSize,
- IRecordDAO recordDAO) {
- super(moduleDefineHolder, batchSize);
+ RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model,
IRecordDAO recordDAO) {
+ super(moduleDefineHolder);
this.model = model;
- this.nonMergeDataCache = new NonMergeDataCache<>();
this.recordDAO = recordDAO;
-
- String name = "RECORD_PERSISTENT";
- BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1,
20);
- try {
- ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
- } catch (Exception e) {
- throw new UnexpectedException(e.getMessage(), e);
- }
-
- this.dataCarrier = new DataCarrier<>(1, 10000);
- this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new
RecordPersistentWorker.PersistentConsumer(this));
+ this.batchDAO =
moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
}
@Override public void in(Record record) {
- dataCarrier.produce(record);
- }
-
- @Override public NonMergeDataCache<Record> getCache() {
- return nonMergeDataCache;
- }
-
- @Override public List<Object> prepareBatch(NonMergeDataCache<Record>
cache) {
- List<Object> batchCollection = new LinkedList<>();
- cache.getLast().collection().forEach(record -> {
- try {
- batchCollection.add(recordDAO.prepareBatchInsert(model,
record));
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
- }
- });
- return batchCollection;
- }
-
- @Override public void cacheData(Record input) {
- nonMergeDataCache.writing();
- nonMergeDataCache.add(input);
- nonMergeDataCache.finishWriting();
- }
-
- private class PersistentConsumer implements IConsumer<Record> {
-
- private final RecordPersistentWorker persistent;
-
- private PersistentConsumer(RecordPersistentWorker persistent) {
- this.persistent = persistent;
- }
-
- @Override public void init() {
-
- }
-
- @Override public void consume(List<Record> data) {
- for (Record record : data) {
- persistent.onWork(record);
- }
- }
-
- @Override public void onError(List<Record> data, Throwable t) {
- logger.error(t.getMessage(), t);
- }
-
- @Override public void onExit() {
+ try {
+ InsertRequest insertRequest = recordDAO.prepareBatchInsert(model,
record);
+ batchDAO.asynchronous(insertRequest);
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
}
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
index b84d99b..3397f75 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
@@ -19,7 +19,6 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
-import lombok.Getter;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
@@ -48,8 +47,6 @@ public class RecordStreamProcessor implements
StreamProcessor<Record> {
}
}
- @Getter private List<RecordPersistentWorker> persistentWorkers = new
ArrayList<>();
-
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
Class<? extends Record> recordClass) {
if (DisableRegister.INSTANCE.include(stream.name())) {
@@ -66,9 +63,8 @@ public class RecordStreamProcessor implements
StreamProcessor<Record> {
IModelSetter modelSetter =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(),
new Storage(stream.name(), true, true, Downsampling.Second), true);
- RecordPersistentWorker persistentWorker = new
RecordPersistentWorker(moduleDefineHolder, model, 4000, recordDAO);
+ RecordPersistentWorker persistentWorker = new
RecordPersistentWorker(moduleDefineHolder, model, recordDAO);
- persistentWorkers.add(persistentWorker);
workers.put(recordClass, persistentWorker);
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
index 37a8265..0287f07 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java
@@ -18,20 +18,21 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
-import java.util.*;
+import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import
org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
/**
* Top N worker is a persistence worker, but no
*
- * @author wusheng
+ * @author wusheng, peng-yongsheng
*/
public class TopNWorker extends PersistenceWorker<TopN,
LimitedSizeDataCache<TopN>> {
@@ -44,9 +45,9 @@ public class TopNWorker extends PersistenceWorker<TopN,
LimitedSizeDataCache<Top
private long reportCycle;
private volatile long lastReportTimestamp;
- public TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model,
+ TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model,
int topNSize, IRecordDAO recordDAO) {
- super(moduleDefineHolder, -1);
+ super(moduleDefineHolder);
this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
this.recordDAO = recordDAO;
this.model = model;
@@ -57,7 +58,7 @@ public class TopNWorker extends PersistenceWorker<TopN,
LimitedSizeDataCache<Top
this.reportCycle = 10 * 60 * 1000L;
}
- @Override void onWork(TopN data) {
+ @Override public void cacheData(TopN data) {
limitedSizeDataCache.writing();
try {
limitedSizeDataCache.add(data);
@@ -66,15 +67,6 @@ public class TopNWorker extends PersistenceWorker<TopN,
LimitedSizeDataCache<Top
}
}
- /**
- * TopN is not following the batch size trigger mode. The memory cost of
this worker is limited always.
- *
- * `onWork` method has been override, so this method would never be
executed. No need to implement this method,
- */
- @Override public void cacheData(TopN data) {
-
- }
-
@Override public LimitedSizeDataCache<TopN> getCache() {
return limitedSizeDataCache;
}
@@ -84,8 +76,6 @@ public class TopNWorker extends PersistenceWorker<TopN,
LimitedSizeDataCache<Top
* time windows.
*
* Switch and persistent attempt happens based on reportCycle.
- *
- * @return
*/
@Override public boolean flushAndSwitch() {
long now = System.currentTimeMillis();
@@ -96,16 +86,14 @@ public class TopNWorker extends PersistenceWorker<TopN,
LimitedSizeDataCache<Top
return super.flushAndSwitch();
}
- @Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN>
cache) {
- List<Object> batchCollection = new LinkedList<>();
+ @Override public void prepareBatch(LimitedSizeDataCache<TopN> cache,
List<PrepareRequest> prepareRequests) {
cache.getLast().collection().forEach(record -> {
try {
- batchCollection.add(recordDAO.prepareBatchInsert(model,
record));
+ prepareRequests.add(recordDAO.prepareBatchInsert(model,
record));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
});
- return batchCollection;
}
@Override public void in(TopN n) {
@@ -113,16 +101,17 @@ public class TopNWorker extends PersistenceWorker<TopN,
LimitedSizeDataCache<Top
}
private class TopNConsumer implements IConsumer<TopN> {
+
@Override public void init() {
}
@Override public void consume(List<TopN> data) {
- /**
+ /*
* TopN is not following the batch size trigger mode.
* No need to implement this method, the memory size is limited
always.
*/
- data.forEach(row -> onWork(row));
+ data.forEach(TopNWorker.this::onWork);
}
@Override public void onError(List<TopN> data, Throwable t) {
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
index 2bd3a56..3c1ed0c 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
@@ -19,13 +19,14 @@
package org.apache.skywalking.oap.server.core.storage;
import java.util.List;
+import org.apache.skywalking.oap.server.library.client.request.*;
/**
* @author peng-yongsheng
*/
public interface IBatchDAO extends DAO {
- void asynchronous(List<?> collection);
+ void asynchronous(InsertRequest insertRequest);
- void synchronous(List<?> collection);
+ void synchronous(List<PrepareRequest> prepareRequests);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
index 312b89f..e8a0724 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
@@ -21,11 +21,12 @@ package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
/**
* @author peng-yongsheng
*/
-public interface IRecordDAO<INSERT> extends DAO {
+public interface IRecordDAO extends DAO {
- INSERT prepareBatchInsert(Model model, Record record) throws IOException;
+ InsertRequest prepareBatchInsert(Model model, Record record) throws
IOException;
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index ffe3480..cedd074 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.*;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.worker.*;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
@@ -61,79 +62,40 @@ public enum PersistenceTimer {
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
- new RunnableWithExceptionProtection(() ->
extractDataAndSaveRecord(batchDAO),
- t -> logger.error("Extract data and save record failure.",
t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
-
- Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
- new RunnableWithExceptionProtection(() ->
extractDataAndSaveMetrics(batchDAO),
- t -> logger.error("Extract data and save metrics
failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
+ new RunnableWithExceptionProtection(() ->
extractDataAndSave(batchDAO),
+ t -> logger.error("Extract data and save failure.", t)),
5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
this.isStarted = true;
}
}
- private void extractDataAndSaveRecord(IBatchDAO batchDAO) {
+ private void extractDataAndSave(IBatchDAO batchDAO) {
if (logger.isDebugEnabled()) {
- logger.debug("Extract data and save record");
+ logger.debug("Extract data and save");
}
long startTime = System.currentTimeMillis();
try {
HistogramMetrics.Timer timer = prepareLatency.createTimer();
- List records = new LinkedList();
+ List<PrepareRequest> prepareRequests = new LinkedList<>();
try {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
-
persistenceWorkers.addAll(RecordStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
+
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
- buildBatchCollection(persistenceWorkers, records);
-
- if (debug) {
- logger.info("build batch persistence duration: {} ms",
System.currentTimeMillis() - startTime);
- }
- } finally {
- timer.finish();
- }
+ persistenceWorkers.forEach(worker -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("extract {} worker data and save",
worker.getClass().getName());
+ }
- HistogramMetrics.Timer executeLatencyTimer =
executeLatency.createTimer();
- try {
- if (CollectionUtils.isNotEmpty(records)) {
- batchDAO.asynchronous(records);
- }
- } finally {
- executeLatencyTimer.finish();
- }
- } catch (Throwable e) {
- errorCounter.inc();
- logger.error(e.getMessage(), e);
- } finally {
- if (logger.isDebugEnabled()) {
- logger.debug("Persistence data save finish");
- }
- }
-
- if (debug) {
- logger.info("Batch persistence duration: {} ms",
System.currentTimeMillis() - startTime);
- }
- }
-
- private void extractDataAndSaveMetrics(IBatchDAO batchDAO) {
- if (logger.isDebugEnabled()) {
- logger.debug("Extract data and save metrics");
- }
-
- long startTime = System.currentTimeMillis();
- try {
- HistogramMetrics.Timer timer = prepareLatency.createTimer();
-
- List metrics = new LinkedList();
- try {
- List<PersistenceWorker> persistenceWorkers = new
ArrayList<>(MetricsStreamProcessor.getInstance().getPersistentWorkers());
- buildBatchCollection(persistenceWorkers, metrics);
+ if (worker.flushAndSwitch()) {
+ worker.buildBatchRequests(prepareRequests);
+ }
+ });
if (debug) {
- logger.info("build metrics batch persistence duration: {}
ms", System.currentTimeMillis() - startTime);
+ logger.info("build batch persistence duration: {} ms",
System.currentTimeMillis() - startTime);
}
} finally {
timer.finish();
@@ -141,8 +103,8 @@ public enum PersistenceTimer {
HistogramMetrics.Timer executeLatencyTimer =
executeLatency.createTimer();
try {
- if (CollectionUtils.isNotEmpty(metrics)) {
- batchDAO.synchronous(metrics);
+ if (CollectionUtils.isNotEmpty(prepareRequests)) {
+ batchDAO.synchronous(prepareRequests);
}
} finally {
executeLatencyTimer.finish();
@@ -160,23 +122,4 @@ public enum PersistenceTimer {
logger.info("Batch persistence duration: {} ms",
System.currentTimeMillis() - startTime);
}
}
-
- @SuppressWarnings("unchecked")
- private void buildBatchCollection(List<PersistenceWorker>
persistenceWorkers, List collection) {
- persistenceWorkers.forEach(worker -> {
- if (logger.isDebugEnabled()) {
- logger.debug("extract {} worker data and save",
worker.getClass().getName());
- }
-
- if (worker.flushAndSwitch()) {
- List<?> batchCollection = worker.buildBatchCollection();
-
- if (logger.isDebugEnabled()) {
- logger.debug("extract {} worker data size: {}",
worker.getClass().getName(), batchCollection.size());
- }
-
- collection.addAll(batchCollection);
- }
- });
- }
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
index b513d38..35cc466 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/AbstractWorker.java
@@ -25,6 +25,7 @@ import
org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
* @author peng-yongsheng
*/
public abstract class AbstractWorker<INPUT> {
+
@Getter private final ModuleDefineHolder moduleDefineHolder;
public AbstractWorker(ModuleDefineHolder moduleDefineHolder) {
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index d4d118d..da8f4e4 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -21,6 +21,7 @@ package
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.List;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.client.request.*;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
@@ -47,38 +48,23 @@ public class BatchProcessEsDAO extends EsDAO implements
IBatchDAO {
this.concurrentRequests = concurrentRequests;
}
- @Override public void asynchronous(List<?> collection) {
+ @Override public void asynchronous(InsertRequest insertRequest) {
if (bulkProcessor == null) {
this.bulkProcessor = getClient().createBulkProcessor(bulkActions,
flushInterval, concurrentRequests);
}
- if (logger.isDebugEnabled()) {
- logger.debug("Asynchronous batch persistent data collection size:
{}", collection.size());
- }
-
- if (CollectionUtils.isNotEmpty(collection)) {
- collection.forEach(builder -> {
- if (builder instanceof IndexRequest) {
- this.bulkProcessor.add((IndexRequest)builder);
- }
- if (builder instanceof UpdateRequest) {
- this.bulkProcessor.add((UpdateRequest)builder);
- }
- });
- this.bulkProcessor.flush();
- }
+ this.bulkProcessor.add((IndexRequest)insertRequest);
}
- @Override public void synchronous(List<?> collection) {
- if (CollectionUtils.isNotEmpty(collection)) {
+ @Override public void synchronous(List<PrepareRequest> prepareRequests) {
+ if (CollectionUtils.isNotEmpty(prepareRequests)) {
BulkRequest request = new BulkRequest();
- for (Object builder : collection) {
- if (builder instanceof IndexRequest) {
- request.add((IndexRequest)builder);
- }
- if (builder instanceof UpdateRequest) {
- request.add((UpdateRequest)builder);
+ for (PrepareRequest prepareRequest : prepareRequests) {
+ if (prepareRequest instanceof InsertRequest) {
+ request.add((IndexRequest)prepareRequest);
+ } else {
+ request.add((UpdateRequest)prepareRequest);
}
}
getClient().synchronousBulk(request);
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
index 05bb6d2..2e7a864 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
@@ -23,13 +23,13 @@ import
org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
-import org.elasticsearch.action.index.IndexRequest;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
/**
* @author peng-yongsheng
*/
-public class RecordEsDAO extends EsDAO implements IRecordDAO<IndexRequest> {
+public class RecordEsDAO extends EsDAO implements IRecordDAO {
private final StorageBuilder<Record> storageBuilder;
@@ -38,7 +38,7 @@ public class RecordEsDAO extends EsDAO implements
IRecordDAO<IndexRequest> {
this.storageBuilder = storageBuilder;
}
- @Override public IndexRequest prepareBatchInsert(Model model, Record
record) throws IOException {
+ @Override public InsertRequest prepareBatchInsert(Model model, Record
record) throws IOException {
XContentBuilder builder = map2builder(storageBuilder.data2Map(record));
String modelName = TimeSeriesUtils.timeSeries(model,
record.getTimeBucket());
return getClient().prepareInsert(modelName, record.id(), builder);
diff --git
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
index bfc8148..279a6a8 100644
---
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
+++
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
@@ -18,16 +18,18 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
-import java.sql.Connection;
-import java.sql.SQLException;
+import java.sql.*;
import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import
org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.library.client.request.*;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author wusheng, peng-yongsheng
@@ -37,24 +39,36 @@ public class H2BatchDAO implements IBatchDAO {
private static final Logger logger =
LoggerFactory.getLogger(H2BatchDAO.class);
private JDBCHikariCPClient h2Client;
+ private final DataCarrier<PrepareRequest> dataCarrier;
public H2BatchDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
+
+ String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1,
20);
+ try {
+ ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+ } catch (Exception e) {
+ throw new UnexpectedException(e.getMessage(), e);
+ }
+
+ this.dataCarrier = new DataCarrier<>(1, 10000);
+ this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new
H2BatchDAO.H2BatchConsumer(this));
}
- @Override public void synchronous(List<?> collection) {
- if (CollectionUtils.isEmpty(collection)) {
+ @Override public void synchronous(List<PrepareRequest> prepareRequests) {
+ if (CollectionUtils.isEmpty(prepareRequests)) {
return;
}
if (logger.isDebugEnabled()) {
- logger.debug("batch sql statements execute, data size: {}",
collection.size());
+ logger.debug("batch sql statements execute, data size: {}",
prepareRequests.size());
}
try (Connection connection = h2Client.getConnection()) {
- for (Object exe : collection) {
+ for (PrepareRequest prepareRequest : prepareRequests) {
try {
- SQLExecutor sqlExecutor = (SQLExecutor)exe;
+ SQLExecutor sqlExecutor = (SQLExecutor)prepareRequest;
sqlExecutor.invoke(connection);
} catch (SQLException e) {
// Just avoid one execution failure makes the rest of
batch failure.
@@ -66,7 +80,31 @@ public class H2BatchDAO implements IBatchDAO {
}
}
- @Override public void asynchronous(List<?> collection) {
- synchronous(collection);
+ @Override public void asynchronous(InsertRequest insertRequest) {
+ this.dataCarrier.produce(insertRequest);
+ }
+
+ private class H2BatchConsumer implements IConsumer<PrepareRequest> {
+
+ private final H2BatchDAO h2BatchDAO;
+
+ private H2BatchConsumer(H2BatchDAO h2BatchDAO) {
+ this.h2BatchDAO = h2BatchDAO;
+ }
+
+ @Override public void init() {
+
+ }
+
+ @Override public void consume(List<PrepareRequest> prepareRequests) {
+ h2BatchDAO.synchronous(prepareRequests);
+ }
+
+ @Override public void onError(List<PrepareRequest> prepareRequests,
Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+
+ @Override public void onExit() {
+ }
}
}
diff --git
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
index aa4c375..39158b4 100644
---
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
+++
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java
@@ -23,12 +23,12 @@ import
org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
/**
* @author wusheng
*/
-public class H2RecordDAO extends H2SQLExecutor implements
IRecordDAO<SQLExecutor> {
+public class H2RecordDAO extends H2SQLExecutor implements IRecordDAO {
private JDBCHikariCPClient h2Client;
private StorageBuilder<Record> storageBuilder;
@@ -38,7 +38,7 @@ public class H2RecordDAO extends H2SQLExecutor implements
IRecordDAO<SQLExecutor
this.storageBuilder = storageBuilder;
}
- @Override public SQLExecutor prepareBatchInsert(Model model, Record
record) throws IOException {
+ @Override public InsertRequest prepareBatchInsert(Model model, Record
record) throws IOException {
return getInsertExecutor(model.getName(), record, storageBuilder);
}
}
diff --git
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
index 142770d..ff86eb2 100644
---
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
+++
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
@@ -19,32 +19,23 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.sql.*;
+import java.util.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
import
org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.ArrayParamBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.*;
+import org.slf4j.*;
/**
* @author wusheng, peng-yongsheng
*/
public class H2SQLExecutor {
+
private static final Logger logger =
LoggerFactory.getLogger(H2SQLExecutor.class);
protected List<StorageData> getByIDs(JDBCHikariCPClient h2Client, String
modelName, String[] ids,
@@ -67,6 +58,7 @@ public class H2SQLExecutor {
}
}
while (storageData != null);
+
return storageDataList;
}
} catch (SQLException | JDBCClientException e) {
@@ -96,8 +88,7 @@ public class H2SQLExecutor {
}
}
- protected StorageData toStorageData(ResultSet rs, String modelName,
- StorageBuilder storageBuilder) throws SQLException {
+ protected StorageData toStorageData(ResultSet rs, String modelName,
StorageBuilder storageBuilder) throws SQLException {
if (rs.next()) {
Map data = new HashMap();
List<ModelColumn> columns =
TableMetaInfo.get(modelName).getColumns();
@@ -122,8 +113,7 @@ public class H2SQLExecutor {
return Const.NONE;
}
- protected SQLExecutor getInsertExecutor(String modelName, StorageData
metrics,
- StorageBuilder storageBuilder) throws IOException {
+ protected SQLExecutor getInsertExecutor(String modelName, StorageData
metrics, StorageBuilder storageBuilder) throws IOException {
Map<String, Object> objectMap = storageBuilder.data2Map(metrics);
SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + modelName + "
VALUES");
@@ -150,8 +140,7 @@ public class H2SQLExecutor {
return new SQLExecutor(sqlBuilder.toString(), param);
}
- protected SQLExecutor getUpdateExecutor(String modelName, StorageData
metrics,
- StorageBuilder storageBuilder) throws IOException {
+ protected SQLExecutor getUpdateExecutor(String modelName, StorageData
metrics, StorageBuilder storageBuilder) throws IOException {
Map<String, Object> objectMap = storageBuilder.data2Map(metrics);
SQLBuilder sqlBuilder = new SQLBuilder("UPDATE " + modelName + " SET
");