This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch metric-cache in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit dcd30ae61745299a89480a1a1edadd5efba3506f Author: Wu Sheng <[email protected]> AuthorDate: Thu Nov 24 15:37:59 2022 +0800 Enhance cache mechanism in the metric persistent process --- docs/en/changes/changes.md | 6 ++ .../analysis/worker/MetricsPersistentWorker.java | 66 +++++++++++++++------- .../oap/server/core/storage/IMetricsDAO.java | 5 +- .../server/core/storage/SessionCacheCallback.java | 50 ++++++++++++++++ .../client/elasticsearch/IndexRequestWrapper.java | 13 ++++- .../client/elasticsearch/UpdateRequestWrapper.java | 14 ++++- .../library/client/request/InsertRequest.java | 1 + .../library/client/request/UpdateRequest.java | 1 + .../storage/plugin/banyandb/BanyanDBBatchDAO.java | 23 ++++++-- .../measure/BanyanDBMeasureInsertRequest.java | 7 +++ .../measure/BanyanDBMeasureUpdateRequest.java | 7 +++ .../banyandb/measure/BanyanDBMetricsDAO.java | 9 +-- .../stream/BanyanDBStreamInsertRequest.java | 5 ++ .../elasticsearch/base/BatchProcessEsDAO.java | 16 +++++- .../base/MetricIndexRequestWrapper.java} | 35 ++++++------ .../base/MetricIndexUpdateWrapper.java} | 35 ++++++------ .../plugin/elasticsearch/base/MetricsEsDAO.java | 9 +-- .../storage/plugin/jdbc/BatchSQLExecutor.java | 45 ++++++++++++--- .../server/storage/plugin/jdbc/SQLExecutor.java | 36 +++++++----- .../plugin/jdbc/common/dao/JDBCManagementDAO.java | 2 +- .../plugin/jdbc/common/dao/JDBCMetricsDAO.java | 9 +-- .../plugin/jdbc/common/dao/JDBCNoneStreamDAO.java | 2 +- .../plugin/jdbc/common/dao/JDBCRecordDAO.java | 2 +- .../plugin/jdbc/common/dao/JDBCSQLExecutor.java | 28 +++++---- .../common/dao/JDBCUITemplateManagementDAO.java | 4 +- .../shardingsphere/ShardingIntegrationTest.java | 2 +- 26 files changed, 318 insertions(+), 114 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index e865f06135..e4f4e855c9 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -119,6 +119,12 @@ * Support dynamic config the sampling strategy in network profiling. * Zipkin module support BanyanDB storage. * Zipkin traces query API, sort the result set by start time by default. +* Enhance cache mechanism in the metric persistent process. + * This cache only worked when the metric is accessible(readable) from the database. Once the insert execution delayed + due to the scale, the cache would lose efficacy. It only works for the last time update per minute, considering our + 25s period. + * Fix ID conflicts for all JDBC storage implementation. Due to insert delay, the JDBC storage implementation would + still generate another new insert statement. #### UI diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 4725f749e2..f4b755e8ed 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -20,27 +20,29 @@ package org.apache.skywalking.oap.server.core.analysis.worker; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory; -import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData; import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.exporter.ExportEvent; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; +import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; +import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; +import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; @@ -58,7 +60,13 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { private static long SESSION_TIMEOUT_OFFSITE_COUNTER = 0; private final Model model; - private final Map<Metrics, Metrics> context; + /** + * The session cache holds the latest metrics in-memory. + * There are two ways to make sure metrics in-cache, + * 1. Metrics is read from the Database through {@link #loadFromStorage(List)} + * 2. The built {@link InsertRequest} executed successfully. + */ + private final Map<Metrics, Metrics> sessionCache; private final IMetricsDAO metricsDAO; private final Optional<AbstractWorker<Metrics>> nextAlarmWorker; private final Optional<AbstractWorker<ExportEvent>> nextExportWorker; @@ -89,7 +97,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) { super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData())); this.model = model; - this.context = new HashMap<>(100); + // Due to the cache would be updated depending on final storage implementation, + // the map/cache could be updated concurrently. + // Set to ConcurrentHashMap in order to avoid HashMap deadlock. + // Since 9.4.0 + this.sessionCache = new ConcurrentHashMap<>(100); this.enableDatabaseSession = enableDatabaseSession; this.metricsDAO = metricsDAO; this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker); @@ -192,12 +204,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { metricsList.add(data); if (metricsList.size() == batchSize) { - flushDataToStorage(metricsList, prepareRequests); + prepareFlushDataToStorage(metricsList, prepareRequests); } } if (metricsList.size() > 0) { - flushDataToStorage(metricsList, prepareRequests); + prepareFlushDataToStorage(metricsList, prepareRequests); } if (prepareRequests.size() > 0) { @@ -209,14 +221,20 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { return prepareRequests; } - private void flushDataToStorage(List<Metrics> metricsList, - List<PrepareRequest> prepareRequests) { + /** + * Build given prepareRequests to prepare database flush + * + * @param metricsList the metrics in the last read from the in-memory aggregated cache. + * @param prepareRequests the results for final execution. + */ + private void prepareFlushDataToStorage(List<Metrics> metricsList, + List<PrepareRequest> prepareRequests) { try { loadFromStorage(metricsList); long timestamp = System.currentTimeMillis(); for (Metrics metrics : metricsList) { - Metrics cachedMetrics = context.get(metrics); + Metrics cachedMetrics = sessionCache.get(metrics); if (cachedMetrics != null) { /* * If the metrics is not supportUpdate, defined through MetricsExtension#supportUpdate, @@ -233,12 +251,22 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { continue; } cachedMetrics.calculate(); - prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cachedMetrics)); + prepareRequests.add( + metricsDAO.prepareBatchUpdate( + model, + cachedMetrics, + new SessionCacheCallback(sessionCache, cachedMetrics) + )); nextWorker(cachedMetrics); cachedMetrics.setLastUpdateTimestamp(timestamp); } else { metrics.calculate(); - prepareRequests.add(metricsDAO.prepareBatchInsert(model, metrics)); + prepareRequests.add( + metricsDAO.prepareBatchInsert( + model, + metrics, + new SessionCacheCallback(sessionCache, metrics) + )); nextWorker(metrics); metrics.setLastUpdateTimestamp(timestamp); } @@ -271,7 +299,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { List<Metrics> notInCacheMetrics = metrics.stream() .filter(m -> { - final Metrics cachedValue = context.get(m); + final Metrics cachedValue = sessionCache.get(m); // Not cached or session disabled, the metric could be tagged `not in cache`. if (cachedValue == null || !enableDatabaseSession) { return true; @@ -286,7 +314,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { if (metricsDAO.isExpiredCache(model, cachedValue, currentTimeMillis, metricsDataTTL)) { // The expired metrics should be removed from the context and tagged `not in cache` directly. - context.remove(m); + sessionCache.remove(m); return true; } } @@ -301,9 +329,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { final List<Metrics> dbMetrics = metricsDAO.multiGet(model, notInCacheMetrics); if (!enableDatabaseSession) { // Clear the cache only after results from DB are returned successfully. - context.clear(); + sessionCache.clear(); } - dbMetrics.forEach(m -> context.put(m, m)); + dbMetrics.forEach(m -> sessionCache.put(m, m)); } catch (final Exception e) { log.error("Failed to load metrics for merging", e); } @@ -312,7 +340,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { @Override public void endOfRound() { if (enableDatabaseSession) { - Iterator<Metrics> iterator = context.values().iterator(); + Iterator<Metrics> iterator = sessionCache.values().iterator(); long timestamp = System.currentTimeMillis(); while (iterator.hasNext()) { Metrics metrics = iterator.next(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java index 4320a83cd9..1baee65e4c 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java @@ -47,7 +47,7 @@ public interface IMetricsDAO extends DAO { * @return InsertRequest should follow the database client driver datatype, in order to make sure it could be * executed ASAP. */ - InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException; + InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException; /** * Transfer the given metrics to an executable update statement. @@ -55,7 +55,7 @@ public interface IMetricsDAO extends DAO { * @return UpdateRequest should follow the database client driver datatype, in order to make sure it could be * executed ASAP. */ - UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException; + UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException; /** * Calculate the expired status of the metric by given current timestamp, metric and TTL. @@ -72,4 +72,5 @@ public interface IMetricsDAO extends DAO { // If the cached metric is older than the TTL indicated. return currentTimeMillis - metricTimestamp > TimeUnit.DAYS.toMillis(ttl); } + } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java new file mode 100644 index 0000000000..923da32f72 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.storage; + +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; + +/** + * SessionCacheCallback provides a bridge for storage implementations + */ +@RequiredArgsConstructor +public class SessionCacheCallback { + private final Map<Metrics, Metrics> sessionCache; + private final Metrics metrics; + /** + * In some cases, this callback could be shared by multiple executions, such as SQLExecutor#additionalSQLs. + * This flag would make sure, once one of the generated executions is failure, the whole metric would be removed + * from the cache, and would not be added back. As those are executed in a batch mode. The sequence is uncertain. + */ + private volatile boolean isFailed = false; + + public void onInsertCompleted() { + if (isFailed) { + return; + } + sessionCache.put(metrics, metrics); + } + + public void onUpdateFailure() { + sessionCache.remove(metrics); + isFailed = true; + } +} diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java index 3e2cbf7ad0..e6bd63e28f 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java @@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest; @Getter public class IndexRequestWrapper implements InsertRequest { - private final IndexRequest request; + protected IndexRequest request; public IndexRequestWrapper(String index, String type, String id, Map<String, ?> source) { @@ -35,4 +35,15 @@ public class IndexRequestWrapper implements InsertRequest { .doc(source) .build(); } + + /** + * Expose an empty constructor to lazy initialization. + */ + protected IndexRequestWrapper() { + + } + + @Override + public void onInsertCompleted() { + } } diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java index 3241aa84e4..dbd9c8c1d2 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java @@ -23,7 +23,7 @@ import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; @Getter public class UpdateRequestWrapper implements UpdateRequest { - private final org.apache.skywalking.library.elasticsearch.requests.UpdateRequest request; + protected org.apache.skywalking.library.elasticsearch.requests.UpdateRequest request; public UpdateRequestWrapper(String index, String type, String id, Map<String, Object> source) { @@ -34,4 +34,16 @@ public class UpdateRequestWrapper implements UpdateRequest { .doc(source) .build(); } + + /** + * Expose an empty constructor to lazy initialization. + */ + protected UpdateRequestWrapper() { + + } + + @Override + public void onUpdateFailure() { + + } } diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java index a9dbdab1af..91ded6a487 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java @@ -18,4 +18,5 @@ package org.apache.skywalking.oap.server.library.client.request; public interface InsertRequest extends PrepareRequest { + void onInsertCompleted(); } diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java index ea914913c9..2895216971 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java @@ -18,4 +18,5 @@ package org.apache.skywalking.oap.server.library.client.request; public interface UpdateRequest extends PrepareRequest { + void onUpdateFailure(); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java index 9564de27da..b3cd0461e0 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java @@ -18,6 +18,9 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor; import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor; import org.apache.skywalking.oap.server.core.storage.AbstractDAO; @@ -29,10 +32,6 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDB import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureUpdateRequest; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStreamInsertRequest; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO { private static final Object STREAM_SYNCHRONIZER = new Object(); private static final Object MEASURE_SYNCHRONIZER = new Object(); @@ -69,9 +68,21 @@ public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> impleme if (r instanceof BanyanDBStreamInsertRequest) { return getStreamBulkWriteProcessor().add(((BanyanDBStreamInsertRequest) r).getStreamWrite()); } else if (r instanceof BanyanDBMeasureInsertRequest) { - return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite()); + return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite()) + .whenComplete((v, throwable) -> { + if (throwable == null) { + // Insert completed + ((BanyanDBMeasureInsertRequest) r).onInsertCompleted(); + } + }); } else if (r instanceof BanyanDBMeasureUpdateRequest) { - return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite()); + return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite()) + .whenComplete((v, throwable) -> { + if (throwable != null) { + // Update failure + ((BanyanDBMeasureUpdateRequest) r).onUpdateFailure(); + } + }); } return CompletableFuture.completedFuture(null); }).toArray(CompletableFuture[]::new)); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java index 3f965bff6f..bbfe883372 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java @@ -21,10 +21,17 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; import lombok.Getter; import lombok.RequiredArgsConstructor; import org.apache.skywalking.banyandb.v1.client.MeasureWrite; +import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; @RequiredArgsConstructor @Getter public class BanyanDBMeasureInsertRequest implements InsertRequest { private final MeasureWrite measureWrite; + private final SessionCacheCallback callback; + + @Override + public void onInsertCompleted() { + callback.onInsertCompleted(); + } } \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java index 2ff1fae015..b39190aada 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java @@ -21,10 +21,17 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; import lombok.Getter; import lombok.RequiredArgsConstructor; import org.apache.skywalking.banyandb.v1.client.MeasureWrite; +import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; @RequiredArgsConstructor @Getter public class BanyanDBMeasureUpdateRequest implements UpdateRequest { private final MeasureWrite measureWrite; + private final SessionCacheCallback callback; + + @Override + public void onUpdateFailure() { + callback.onUpdateFailure(); + } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java index 0b24eb0e29..0ca07e8cfe 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java @@ -26,6 +26,7 @@ import org.apache.skywalking.banyandb.v1.client.MeasureWrite; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; +import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; @@ -77,7 +78,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD } @Override - public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException { + public InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException { log.info("prepare to insert {}", model.getName()); MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); if (schema == null) { @@ -89,11 +90,11 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite); storageBuilder.entity2Storage(metrics, toStorage); toStorage.acceptID(metrics.id()); - return new BanyanDBMeasureInsertRequest(toStorage.obtain()); + return new BanyanDBMeasureInsertRequest(toStorage.obtain(), callback); } @Override - public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException { + public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException { log.info("prepare to update {}", model.getName()); MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); if (schema == null) { @@ -105,6 +106,6 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite); storageBuilder.entity2Storage(metrics, toStorage); toStorage.acceptID(metrics.id()); - return new BanyanDBMeasureUpdateRequest(toStorage.obtain()); + return new BanyanDBMeasureUpdateRequest(toStorage.obtain(), callback); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java index b04e4422d5..15dbc52c53 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java @@ -27,4 +27,9 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest; @Getter public class BanyanDBStreamInsertRequest implements InsertRequest { private final StreamWrite streamWrite; + + @Override + public void onInsertCompleted() { + + } } \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java index dc6c47bb19..76e44d67fd 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java @@ -75,9 +75,21 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { if (CollectionUtils.isNotEmpty(prepareRequests)) { return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> { if (prepareRequest instanceof InsertRequest) { - return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest()); + return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest()) + .whenComplete((v, throwable) -> { + if (throwable == null) { + // Insert completed + ((IndexRequestWrapper) prepareRequest).onInsertCompleted(); + } + }); } else { - return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest()); + return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest()) + .whenComplete((v, throwable) -> { + if (throwable != null) { + // Update failure + ((UpdateRequestWrapper) prepareRequest).onUpdateFailure(); + } + }); } }).toArray(CompletableFuture[]::new)); } diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexRequestWrapper.java similarity index 50% copy from oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexRequestWrapper.java index 3e2cbf7ad0..ce5cc6ec91 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexRequestWrapper.java @@ -13,26 +13,29 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.skywalking.oap.server.library.client.elasticsearch; +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; + +import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; +import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexRequestWrapper; -import java.util.Map; -import lombok.Getter; -import org.apache.skywalking.library.elasticsearch.requests.IndexRequest; -import org.apache.skywalking.oap.server.library.client.request.InsertRequest; +/** + * MetricIndexRequestWrapper wraps the built request wrapper with a new callback. + */ +public class MetricIndexRequestWrapper extends IndexRequestWrapper { + private final SessionCacheCallback callback; -@Getter -public class IndexRequestWrapper implements InsertRequest { - private final IndexRequest request; + public MetricIndexRequestWrapper(IndexRequestWrapper requestWrapper, SessionCacheCallback callback) { + this.request = requestWrapper.getRequest(); + this.callback = callback; + } - public IndexRequestWrapper(String index, String type, String id, - Map<String, ?> source) { - request = IndexRequest.builder() - .index(index) - .type(type) - .id(id) - .doc(source) - .build(); + @Override + public void onInsertCompleted() { + if (callback != null) { + callback.onInsertCompleted(); + } } } diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexUpdateWrapper.java similarity index 50% copy from oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexUpdateWrapper.java index 3e2cbf7ad0..34216534b9 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexUpdateWrapper.java @@ -13,26 +13,29 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.skywalking.oap.server.library.client.elasticsearch; +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; + +import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; +import org.apache.skywalking.oap.server.library.client.elasticsearch.UpdateRequestWrapper; -import java.util.Map; -import lombok.Getter; -import org.apache.skywalking.library.elasticsearch.requests.IndexRequest; -import org.apache.skywalking.oap.server.library.client.request.InsertRequest; +/** + * MetricIndexUpdateWrapper wraps the built request wrapper with a new callback. + */ +public class MetricIndexUpdateWrapper extends UpdateRequestWrapper { + private final SessionCacheCallback callback; -@Getter -public class IndexRequestWrapper implements InsertRequest { - private final IndexRequest request; + public MetricIndexUpdateWrapper(UpdateRequestWrapper requestWrapper, SessionCacheCallback callback) { + this.request = requestWrapper.getRequest(); + this.callback = callback; + } - public IndexRequestWrapper(String index, String type, String id, - Map<String, ?> source) { - request = IndexRequest.builder() - .index(index) - .type(type) - .id(id) - .doc(source) - .build(); + @Override + public void onUpdateFailure() { + if (callback != null) { + callback.onUpdateFailure(); + } } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java index 0363669475..12319dae1a 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java @@ -32,6 +32,7 @@ import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; +import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; @@ -97,24 +98,24 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO { } @Override - public InsertRequest prepareBatchInsert(Model model, Metrics metrics) { + public InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) { final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName()); storageBuilder.entity2Storage(metrics, toStorage); Map<String, Object> builder = IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain()); String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket()); String id = IndexController.INSTANCE.generateDocId(model, metrics.id()); - return getClient().prepareInsert(modelName, id, builder); + return new MetricIndexRequestWrapper(getClient().prepareInsert(modelName, id, builder), callback); } @Override - public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) { + public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) { final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName()); storageBuilder.entity2Storage(metrics, toStorage); Map<String, Object> builder = IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain()); String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket()); String id = IndexController.INSTANCE.generateDocId(model, metrics.id()); - return getClient().prepareUpdate(modelName, id, builder); + return new MetricIndexUpdateWrapper(getClient().prepareUpdate(modelName, id, builder), callback); } @Override diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java index a3d4848463..4061ae7405 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java @@ -18,15 +18,17 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.List; /** * A Batch SQL executor. @@ -45,32 +47,59 @@ public class BatchSQLExecutor implements InsertRequest, UpdateRequest { return; } String sql = prepareRequests.get(0).toString(); + List<PrepareRequest> bulkRequest = new ArrayList<>(maxBatchSqlSize); try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) { int pendingCount = 0; for (int k = 0; k < prepareRequests.size(); k++) { SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k); sqlExecutor.setParameters(preparedStatement); preparedStatement.addBatch(); + bulkRequest.add(sqlExecutor); if (k > 0 && k % maxBatchSqlSize == 0) { - executeBatch(preparedStatement, maxBatchSqlSize, sql); + executeBatch(preparedStatement, maxBatchSqlSize, sql, bulkRequest); + bulkRequest.clear(); pendingCount = 0; } else { pendingCount++; } } if (pendingCount > 0) { - executeBatch(preparedStatement, pendingCount, sql); + executeBatch(preparedStatement, pendingCount, sql, bulkRequest); + bulkRequest.clear(); } } } - private void executeBatch(PreparedStatement preparedStatement, int pendingCount, String sql) throws SQLException { + private void executeBatch(PreparedStatement preparedStatement, + int pendingCount, + String sql, + List<PrepareRequest> bulkRequest) throws SQLException { long start = System.currentTimeMillis(); - preparedStatement.executeBatch(); + final int[] executeBatchResults = preparedStatement.executeBatch(); + boolean isInsert = bulkRequest.get(0) instanceof InsertRequest; + for (int i = 0; i < executeBatchResults.length; i++) { + if (executeBatchResults[i] == 1 && isInsert) { + // Insert successfully. + ((InsertRequest) bulkRequest.get(i)).onInsertCompleted(); + } else if (executeBatchResults[i] == 0 && !isInsert) { + // Update Failure. + ((UpdateRequest) bulkRequest.get(i)).onUpdateFailure(); + } + } if (log.isDebugEnabled()) { long end = System.currentTimeMillis(); long cost = end - start; log.debug("execute batch sql, batch size: {}, cost:{}ms, sql: {}", pendingCount, cost, sql); } } + + @Override + public void onInsertCompleted() { + throw new UnexpectedException("BatchSQLExecutor.onInsertCompleted should not be called"); + } + + @Override + public void onUpdateFailure() { + throw new UnexpectedException("BatchSQLExecutor.onUpdateFailure should not be called"); + } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java index 26c19ee276..266350eb45 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java @@ -25,34 +25,30 @@ import java.util.ArrayList; import java.util.List; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A SQL executor. */ @EqualsAndHashCode(of = "sql") +@RequiredArgsConstructor +@Slf4j public class SQLExecutor implements InsertRequest, UpdateRequest { - - private static final Logger LOGGER = LoggerFactory.getLogger(SQLExecutor.class); - - private String sql; - private List<Object> param; + private final String sql; + private final List<Object> param; + private final SessionCacheCallback callback; @Getter private List<SQLExecutor> additionalSQLs; - public SQLExecutor(String sql, List<Object> param) { - this.sql = sql; - this.param = param; - } - public void invoke(Connection connection) throws SQLException { PreparedStatement preparedStatement = connection.prepareStatement(sql); setParameters(preparedStatement); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("execute sql in batch: {}, parameters: {}", sql, param); + if (log.isDebugEnabled()) { + log.debug("execute sql in batch: {}, parameters: {}", sql, param); } preparedStatement.execute(); if (additionalSQLs != null) { @@ -79,4 +75,16 @@ public class SQLExecutor implements InsertRequest, UpdateRequest { } additionalSQLs.addAll(sqlExecutors); } + + @Override + public void onInsertCompleted() { + if (callback != null) + callback.onInsertCompleted(); + } + + @Override + public void onUpdateFailure() { + if (callback != null) + callback.onUpdateFailure(); + } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java index 50becff534..2aea8f6ef3 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java @@ -48,7 +48,7 @@ public class JDBCManagementDAO extends JDBCSQLExecutor implements IManagementDAO } SQLExecutor insertExecutor = getInsertExecutor(model.getName(), storageData, storageBuilder, - new HashMapConverter.ToStorage()); + new HashMapConverter.ToStorage(), null); insertExecutor.invoke(connection); } catch (IOException | SQLException e) { throw new IOException(e.getMessage(), e); diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java index 3294d80720..5170362df7 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; +import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; import org.apache.skywalking.oap.server.core.storage.StorageData; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter; @@ -49,12 +50,12 @@ public class JDBCMetricsDAO extends JDBCSQLExecutor implements IMetricsDAO { } @Override - public SQLExecutor prepareBatchInsert(Model model, Metrics metrics) throws IOException { - return getInsertExecutor(model.getName(), metrics, storageBuilder, new HashMapConverter.ToStorage()); + public SQLExecutor prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException { + return getInsertExecutor(model.getName(), metrics, storageBuilder, new HashMapConverter.ToStorage(), callback); } @Override - public SQLExecutor prepareBatchUpdate(Model model, Metrics metrics) throws IOException { - return getUpdateExecutor(model.getName(), metrics, storageBuilder); + public SQLExecutor prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException { + return getUpdateExecutor(model.getName(), metrics, storageBuilder, callback); } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java index aaf903b76b..7a87059e08 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java @@ -41,7 +41,7 @@ public class JDBCNoneStreamDAO extends JDBCSQLExecutor implements INoneStreamDAO @Override public void insert(Model model, NoneStream noneStream) throws IOException { try (Connection connection = jdbcClient.getConnection()) { - SQLExecutor insertExecutor = getInsertExecutor(model.getName(), noneStream, storageBuilder, new HashMapConverter.ToStorage()); + SQLExecutor insertExecutor = getInsertExecutor(model.getName(), noneStream, storageBuilder, new HashMapConverter.ToStorage(), null); insertExecutor.invoke(connection); } catch (IOException | SQLException e) { throw new IOException(e.getMessage(), e); diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java index 1ca36a69fe..f382f746e1 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java @@ -35,6 +35,6 @@ public class JDBCRecordDAO extends JDBCSQLExecutor implements IRecordDAO { @Override public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException { - return getInsertExecutor(model.getName(), record, storageBuilder, new HashMapConverter.ToStorage()); + return getInsertExecutor(model.getName(), record, storageBuilder, new HashMapConverter.ToStorage(), null); } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java index 12321e3629..c15f4768ae 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback; import org.apache.skywalking.oap.server.core.storage.StorageData; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; @@ -116,7 +117,8 @@ public class JDBCSQLExecutor { protected <T extends StorageData> SQLExecutor getInsertExecutor(String modelName, T metrics, StorageBuilder<T> storageBuilder, - Convert2Storage<Map<String, Object>> converter) throws IOException { + Convert2Storage<Map<String, Object>> converter, + SessionCacheCallback callback) throws IOException { Model model = TableMetaInfo.get(modelName); storageBuilder.entity2Storage(metrics, converter); Map<String, Object> objectMap = converter.obtain(); @@ -126,7 +128,7 @@ public class JDBCSQLExecutor { mainEntity.put(column.getColumnName().getName(), objectMap.get(column.getColumnName().getName())); }); SQLExecutor sqlExecutor = buildInsertExecutor( - modelName, model.getColumns(), metrics, mainEntity); + modelName, model.getColumns(), metrics, mainEntity, callback); //build additional table sql for (SQLDatabaseModelExtension.AdditionalTable additionalTable : model.getSqlDBModelExtension() .getAdditionalTables() @@ -137,7 +139,7 @@ public class JDBCSQLExecutor { }); List<SQLExecutor> additionalSQLExecutors = buildAdditionalInsertExecutor( - additionalTable.getName(), additionalTable.getColumns(), metrics, additionalEntity + additionalTable.getName(), additionalTable.getColumns(), metrics, additionalEntity, callback ); sqlExecutor.appendAdditionalSQLs(additionalSQLExecutors); } @@ -147,7 +149,8 @@ public class JDBCSQLExecutor { private <T extends StorageData> SQLExecutor buildInsertExecutor(String tableName, List<ModelColumn> columns, T metrics, - Map<String, Object> objectMap) throws IOException { + Map<String, Object> objectMap, + SessionCacheCallback onCompleteCallback) throws IOException { SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + tableName + " VALUES"); List<Object> param = new ArrayList<>(); sqlBuilder.append("(?,"); @@ -169,13 +172,14 @@ public class JDBCSQLExecutor { } sqlBuilder.append(")"); - return new SQLExecutor(sqlBuilder.toString(), param); + return new SQLExecutor(sqlBuilder.toString(), param, onCompleteCallback); } private <T extends StorageData> List<SQLExecutor> buildAdditionalInsertExecutor(String tableName, List<ModelColumn> columns, T metrics, - Map<String, Object> objectMap) throws IOException { + Map<String, Object> objectMap, + SessionCacheCallback callback) throws IOException { List<SQLExecutor> sqlExecutors = new ArrayList<>(); SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + tableName + " VALUES"); @@ -211,17 +215,18 @@ public class JDBCSQLExecutor { for (Object object : valueList) { List<Object> paramCopy = new ArrayList<>(param); paramCopy.set(position, object); - sqlExecutors.add(new SQLExecutor(sql, paramCopy)); + sqlExecutors.add(new SQLExecutor(sql, paramCopy, callback)); } } else { - sqlExecutors.add(new SQLExecutor(sql, param)); + sqlExecutors.add(new SQLExecutor(sql, param, callback)); } return sqlExecutors; } protected <T extends StorageData> SQLExecutor getUpdateExecutor(String modelName, T metrics, - StorageBuilder<T> storageBuilder) throws IOException { + StorageBuilder<T> storageBuilder, + SessionCacheCallback callback) throws IOException { final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage(); storageBuilder.entity2Storage(metrics, toStorage); Map<String, Object> objectMap = toStorage.obtain(); @@ -236,7 +241,8 @@ public class JDBCSQLExecutor { if (model.getSqlDBModelExtension().isShardingTable()) { SQLDatabaseModelExtension.Sharding sharding = model.getSqlDBModelExtension().getSharding().orElseThrow( () -> new UnexpectedException("Sharding should not be empty.")); - if (columnName.equals(sharding.getDataSourceShardingColumn()) || columnName.equals(sharding.getTableShardingColumn())) { + if (columnName.equals(sharding.getDataSourceShardingColumn()) || columnName.equals( + sharding.getTableShardingColumn())) { continue; } } @@ -253,6 +259,6 @@ public class JDBCSQLExecutor { sqlBuilder.append(" WHERE id = ?"); param.add(metrics.id()); - return new SQLExecutor(sqlBuilder.toString(), param); + return new SQLExecutor(sqlBuilder.toString(), param, callback); } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java index 7097aad725..9267efd21c 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java @@ -102,7 +102,7 @@ public class JDBCUITemplateManagementDAO extends JDBCSQLExecutor implements UITe public TemplateChangeStatus addTemplate(final DashboardSetting setting) throws IOException { final UITemplate uiTemplate = setting.toEntity(); final SQLExecutor insertExecutor = getInsertExecutor( - UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), new HashMapConverter.ToStorage()); + UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), new HashMapConverter.ToStorage(), null); try (Connection connection = h2Client.getConnection()) { insertExecutor.invoke(connection); return TemplateChangeStatus.builder().status(true).id(setting.getId()).build(); @@ -135,7 +135,7 @@ public class JDBCUITemplateManagementDAO extends JDBCSQLExecutor implements UITe private TemplateChangeStatus executeUpdate(final UITemplate uiTemplate) throws IOException { final SQLExecutor updateExecutor = getUpdateExecutor( - UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder()); + UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), null); try (Connection connection = h2Client.getConnection()) { updateExecutor.invoke(connection); return TemplateChangeStatus.builder().status(true).id(uiTemplate.getTemplateId()).build(); diff --git a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java index c7ac504256..c9776758ec 100644 --- a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java +++ b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java @@ -642,7 +642,7 @@ public class ShardingIntegrationTest { .builder() .getDeclaredConstructor() .newInstance()); - jdbcMetricsDAO.prepareBatchInsert(model, metrics).invoke(conn); + jdbcMetricsDAO.prepareBatchInsert(model, metrics, null).invoke(conn); } } }
