This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cd1c3b0c0d8 PipelineImportSQLBuilder Caffeine to replace ConcurrentMap
(#27251)
cd1c3b0c0d8 is described below
commit cd1c3b0c0d8c4e34242b6cf2754e24201c9bf3f9
Author: PAN <[email protected]>
AuthorDate: Wed Jul 19 12:31:26 2023 +0800
PipelineImportSQLBuilder Caffeine to replace ConcurrentMap (#27251)
---
kernel/data-pipeline/core/pom.xml | 4 +++
.../sqlbuilder/PipelineImportSQLBuilder.java | 32 +++++++++++++---------
2 files changed, 23 insertions(+), 13 deletions(-)
diff --git a/kernel/data-pipeline/core/pom.xml
b/kernel/data-pipeline/core/pom.xml
index ca351465e7a..363046c1914 100644
--- a/kernel/data-pipeline/core/pom.xml
+++ b/kernel/data-pipeline/core/pom.xml
@@ -96,5 +96,9 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilder.java
index e5c296f1444..2291f793964 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilder.java
@@ -17,6 +17,8 @@
package org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
@@ -24,9 +26,8 @@ import
org.apache.shardingsphere.infra.database.spi.DatabaseType;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import java.util.Collection;
+import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
@@ -44,12 +45,17 @@ public final class PipelineImportSQLBuilder {
private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
- private final ConcurrentMap<String, String> sqlCacheMap;
+ private final Cache<String, String> sqlCache;
public PipelineImportSQLBuilder(final DatabaseType databaseType) {
dialectSQLBuilder =
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
databaseType);
sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
- sqlCacheMap = new ConcurrentHashMap<>();
+ sqlCache = buildPipelineCache();
+ }
+
+ private Cache<String, String> buildPipelineCache() {
+ Caffeine<Object, Object> result =
Caffeine.newBuilder().initialCapacity(128).maximumSize(1024).softValues();
+ return result.build();
}
/**
@@ -61,11 +67,11 @@ public final class PipelineImportSQLBuilder {
*/
public String buildInsertSQL(final String schemaName, final DataRecord
dataRecord) {
String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
- if (!sqlCacheMap.containsKey(sqlCacheKey)) {
+ if (null == sqlCache.getIfPresent(sqlCacheKey)) {
String insertMainClause = buildInsertMainClause(schemaName,
dataRecord);
- sqlCacheMap.put(sqlCacheKey,
dialectSQLBuilder.buildInsertOnDuplicateClause(dataRecord).map(optional ->
insertMainClause + " " + optional).orElse(insertMainClause));
+ sqlCache.put(sqlCacheKey,
dialectSQLBuilder.buildInsertOnDuplicateClause(dataRecord).map(optional ->
insertMainClause + " " + optional).orElse(insertMainClause));
}
- return sqlCacheMap.get(sqlCacheKey);
+ return sqlCache.getIfPresent(sqlCacheKey);
}
private String buildInsertMainClause(final String schemaName, final
DataRecord dataRecord) {
@@ -84,13 +90,13 @@ public final class PipelineImportSQLBuilder {
*/
public String buildUpdateSQL(final String schemaName, final DataRecord
dataRecord, final Collection<Column> conditionColumns) {
String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
- if (!sqlCacheMap.containsKey(sqlCacheKey)) {
+ if (null == sqlCache.getIfPresent(sqlCacheKey)) {
String updateMainClause = String.format("UPDATE %s SET %%s",
sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()));
- sqlCacheMap.put(sqlCacheKey,
buildWhereClause(conditionColumns).map(optional -> updateMainClause +
optional).orElse(updateMainClause));
+ sqlCache.put(sqlCacheKey,
buildWhereClause(conditionColumns).map(optional -> updateMainClause +
optional).orElse(updateMainClause));
}
Collection<Column> setColumns =
dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
String updateSetClause = setColumns.stream().map(each ->
sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + " =
?").collect(Collectors.joining(","));
- return String.format(sqlCacheMap.get(sqlCacheKey), updateSetClause);
+ return
String.format(Objects.requireNonNull(sqlCache.getIfPresent(sqlCacheKey)),
updateSetClause);
}
/**
@@ -103,11 +109,11 @@ public final class PipelineImportSQLBuilder {
*/
public String buildDeleteSQL(final String schemaName, final DataRecord
dataRecord, final Collection<Column> conditionColumns) {
String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
- if (!sqlCacheMap.containsKey(sqlCacheKey)) {
+ if (null == sqlCache.getIfPresent(sqlCacheKey)) {
String deleteMainClause = buildDeleteMainClause(schemaName,
dataRecord);
- sqlCacheMap.put(sqlCacheKey,
buildWhereClause(conditionColumns).map(optional -> deleteMainClause +
optional).orElse(deleteMainClause));
+ sqlCache.put(sqlCacheKey,
buildWhereClause(conditionColumns).map(optional -> deleteMainClause +
optional).orElse(deleteMainClause));
}
- return sqlCacheMap.get(sqlCacheKey);
+ return sqlCache.getIfPresent(sqlCacheKey);
}
private String buildDeleteMainClause(final String schemaName, final
DataRecord dataRecord) {