This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cd1c3b0c0d8 PipelineImportSQLBuilder Caffeine to replace ConcurrentMap 
(#27251)
cd1c3b0c0d8 is described below

commit cd1c3b0c0d8c4e34242b6cf2754e24201c9bf3f9
Author: PAN <[email protected]>
AuthorDate: Wed Jul 19 12:31:26 2023 +0800

    PipelineImportSQLBuilder Caffeine to replace ConcurrentMap (#27251)
---
 kernel/data-pipeline/core/pom.xml                  |  4 +++
 .../sqlbuilder/PipelineImportSQLBuilder.java       | 32 +++++++++++++---------
 2 files changed, 23 insertions(+), 13 deletions(-)

diff --git a/kernel/data-pipeline/core/pom.xml 
b/kernel/data-pipeline/core/pom.xml
index ca351465e7a..363046c1914 100644
--- a/kernel/data-pipeline/core/pom.xml
+++ b/kernel/data-pipeline/core/pom.xml
@@ -96,5 +96,9 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilder.java
index e5c296f1444..2291f793964 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineImportSQLBuilder.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
@@ -24,9 +26,8 @@ import 
org.apache.shardingsphere.infra.database.spi.DatabaseType;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 
 import java.util.Collection;
+import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 /**
@@ -44,12 +45,17 @@ public final class PipelineImportSQLBuilder {
     
     private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
     
-    private final ConcurrentMap<String, String> sqlCacheMap;
+    private final Cache<String, String> sqlCache;
     
     public PipelineImportSQLBuilder(final DatabaseType databaseType) {
         dialectSQLBuilder = 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
databaseType);
         sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
-        sqlCacheMap = new ConcurrentHashMap<>();
+        sqlCache = buildPipelineCache();
+    }
+    
+    private Cache<String, String> buildPipelineCache() {
+        Caffeine<Object, Object> result = 
Caffeine.newBuilder().initialCapacity(128).maximumSize(1024).softValues();
+        return result.build();
     }
     
     /**
@@ -61,11 +67,11 @@ public final class PipelineImportSQLBuilder {
      */
     public String buildInsertSQL(final String schemaName, final DataRecord 
dataRecord) {
         String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + 
dataRecord.getTableName();
-        if (!sqlCacheMap.containsKey(sqlCacheKey)) {
+        if (null == sqlCache.getIfPresent(sqlCacheKey)) {
             String insertMainClause = buildInsertMainClause(schemaName, 
dataRecord);
-            sqlCacheMap.put(sqlCacheKey, 
dialectSQLBuilder.buildInsertOnDuplicateClause(dataRecord).map(optional -> 
insertMainClause + " " + optional).orElse(insertMainClause));
+            sqlCache.put(sqlCacheKey, 
dialectSQLBuilder.buildInsertOnDuplicateClause(dataRecord).map(optional -> 
insertMainClause + " " + optional).orElse(insertMainClause));
         }
-        return sqlCacheMap.get(sqlCacheKey);
+        return sqlCache.getIfPresent(sqlCacheKey);
     }
     
     private String buildInsertMainClause(final String schemaName, final 
DataRecord dataRecord) {
@@ -84,13 +90,13 @@ public final class PipelineImportSQLBuilder {
      */
     public String buildUpdateSQL(final String schemaName, final DataRecord 
dataRecord, final Collection<Column> conditionColumns) {
         String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + 
dataRecord.getTableName();
-        if (!sqlCacheMap.containsKey(sqlCacheKey)) {
+        if (null == sqlCache.getIfPresent(sqlCacheKey)) {
             String updateMainClause = String.format("UPDATE %s SET %%s", 
sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()));
-            sqlCacheMap.put(sqlCacheKey, 
buildWhereClause(conditionColumns).map(optional -> updateMainClause + 
optional).orElse(updateMainClause));
+            sqlCache.put(sqlCacheKey, 
buildWhereClause(conditionColumns).map(optional -> updateMainClause + 
optional).orElse(updateMainClause));
         }
         Collection<Column> setColumns = 
dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
         String updateSetClause = setColumns.stream().map(each -> 
sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + " = 
?").collect(Collectors.joining(","));
-        return String.format(sqlCacheMap.get(sqlCacheKey), updateSetClause);
+        return 
String.format(Objects.requireNonNull(sqlCache.getIfPresent(sqlCacheKey)), 
updateSetClause);
     }
     
     /**
@@ -103,11 +109,11 @@ public final class PipelineImportSQLBuilder {
      */
     public String buildDeleteSQL(final String schemaName, final DataRecord 
dataRecord, final Collection<Column> conditionColumns) {
         String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + 
dataRecord.getTableName();
-        if (!sqlCacheMap.containsKey(sqlCacheKey)) {
+        if (null == sqlCache.getIfPresent(sqlCacheKey)) {
             String deleteMainClause = buildDeleteMainClause(schemaName, 
dataRecord);
-            sqlCacheMap.put(sqlCacheKey, 
buildWhereClause(conditionColumns).map(optional -> deleteMainClause + 
optional).orElse(deleteMainClause));
+            sqlCache.put(sqlCacheKey, 
buildWhereClause(conditionColumns).map(optional -> deleteMainClause + 
optional).orElse(deleteMainClause));
         }
-        return sqlCacheMap.get(sqlCacheKey);
+        return sqlCache.getIfPresent(sqlCacheKey);
     }
     
     private String buildDeleteMainClause(final String schemaName, final 
DataRecord dataRecord) {

Reply via email to