sandynz commented on code in PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#discussion_r994526630


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -37,51 +40,137 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends 
AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends 
AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = 
Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private static final String CHUNK_SIZE_KEY = "chunk-size";
+    
+    private static final int DEFAULT_CHUNK_SIZE = 5000;

Review Comment:
   Default chunk size could be greater, e.g. 10_0000



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -37,51 +40,137 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends 
AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends 
AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = 
Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private static final String CHUNK_SIZE_KEY = "chunk-size";
+    
+    private static final int DEFAULT_CHUNK_SIZE = 5000;
+    
+    private final Map<String, String> sqlCache = new ConcurrentHashMap<>();
+    
     @Getter
     private Properties props;
     
+    private int chunkSize;
+    
     @Override
     public void init(final Properties props) {
         this.props = props;
+        chunkSize = getChunkSize(props);
+    }
+    
+    private int getChunkSize(final Properties props) {
+        int result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY, 
DEFAULT_CHUNK_SIZE + ""));
+        if (result <= 0) {
+            log.warn("Invalid result={}, use default value", result);
+            return DEFAULT_CHUNK_SIZE;
+        }
+        return result;
     }
     
     @Override
-    public Iterable<DataConsistencyCalculatedResult> calculate(final 
DataConsistencyCalculateParameter parameter) {
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final 
DataConsistencyCalculateParameter parameter) {
         PipelineSQLBuilder sqlBuilder = 
PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        List<CalculatedItem> calculatedItems = 
parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, 
parameter, each)).collect(Collectors.toList());
-        return Collections.singletonList(new 
CalculatedResult(calculatedItems.get(0).getRecordsCount(), 
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+        PipelineColumnMetaData uniqueKey = parameter.getUniqueKey();
+        CalculatedResult previousCalculatedResult = (CalculatedResult) 
parameter.getPreviousCalculatedResult();
+        Object beginId;
+        if (null == previousCalculatedResult) {
+            beginId = getBeginIdFromUniqueKey(uniqueKey.getDataType());
+        } else {
+            beginId = previousCalculatedResult.getMaxUniqueKeyValue();
+        }
+        Object endId = getMaxUniqueKeyValue(sqlBuilder, parameter);
+        if (null == endId) {
+            return Optional.empty();
+        }
+        List<CalculatedItem> calculatedItems = 
parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, 
parameter, each, beginId, endId)).collect(Collectors.toList());
+        int recordsCount = calculatedItems.get(0).getRecordsCount();
+        return Optional.of(new CalculatedResult(endId, recordsCount, 
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+    }
+    
+    private Object getBeginIdFromUniqueKey(final int columnType) {
+        if (PipelineJdbcUtils.isStringColumn(columnType)) {
+            return "!";
+        } else {
+            return Integer.MIN_VALUE;
+        }
     }
     
-    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, 
final DataConsistencyCalculateParameter parameter, final String columnName) {
+    private Object getMaxUniqueKeyValue(final PipelineSQLBuilder sqlBuilder, 
final DataConsistencyCalculateParameter parameter) {
+        String schemaName = parameter.getSchemaName();
+        String logicTableName = parameter.getLogicTableName();
+        String cacheKeyPrefix = "uniqueKey-" + (null == 
parameter.getPreviousCalculatedResult() ? "first" : "later") + "-";
+        String cacheKey = cacheKeyPrefix + parameter.getDatabaseType() + "-" + 
(null != schemaName && 
DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.computeIfAbsent(cacheKey, s -> 
sqlBuilder.buildChunkedQueryUniqueKeySQL(schemaName, logicTableName, 
parameter.getUniqueKey().getName(),
+                null == parameter.getPreviousCalculatedResult()));
+        CalculatedResult previousCalculatedResult = (CalculatedResult) 
parameter.getPreviousCalculatedResult();
+        try (
+                Connection connection = 
parameter.getDataSource().getConnection();
+                PreparedStatement preparedStatement = 
setCurrentStatement(connection.prepareStatement(sql))) {
+            preparedStatement.setFetchSize(chunkSize);
+            if (null == previousCalculatedResult) {
+                preparedStatement.setInt(1, chunkSize);
+            } else {
+                preparedStatement.setObject(1, 
previousCalculatedResult.getMaxUniqueKeyValue());
+                preparedStatement.setInt(2, chunkSize);
+            }
+            Object maxUniqueKeyValue = null;
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    maxUniqueKeyValue = resultSet.getObject(1);
+                }
+            }
+            return maxUniqueKeyValue;
+        } catch (final SQLException ex) {
+            log.error("get max unique key value failed", ex);
+            throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(logicTableName);
+        }
+    }
+    
+    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, 
final DataConsistencyCalculateParameter parameter, final String columnName, 
final Object beginId, final Object endId) {
         String logicTableName = parameter.getLogicTableName();
         String schemaName = parameter.getSchemaName();
-        Optional<String> sql = sqlBuilder.buildCRC32SQL(schemaName, 
logicTableName, columnName);
-        ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new 
UnsupportedCRC32DataConsistencyCalculateAlgorithmException(parameter.getDatabaseType()));
-        return calculateCRC32(parameter.getDataSource(), logicTableName, 
sql.get());
+        String cacheKey = "crc32-" + parameter.getDatabaseType() + "-" + (null 
!= schemaName && 
DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.get(cacheKey);
+        if (null == sql) {
+            Optional<String> optional = sqlBuilder.buildCRC32SQL(schemaName, 
logicTableName, columnName, parameter.getUniqueKey().getName());
+            ShardingSpherePreconditions.checkState(optional.isPresent(), () -> 
new 
UnsupportedCRC32DataConsistencyCalculateAlgorithmException(parameter.getDatabaseType()));
+            sql = optional.get();
+            sqlCache.put(cacheKey, sql);
+        }
+        return calculateCRC32(parameter.getDataSource(), logicTableName, sql, 
beginId, endId);
     }
     
-    private CalculatedItem calculateCRC32(final DataSource dataSource, final 
String logicTableName, final String sql) {
+    private CalculatedItem calculateCRC32(final DataSource dataSource, final 
String logicTableName, final String sql, final Object beginId, final Object 
endId) {
         try (

Review Comment:
   The second `calculateCRC32` method could be merged into the first one



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -37,51 +40,137 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends 
AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends 
AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = 
Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private static final String CHUNK_SIZE_KEY = "chunk-size";
+    
+    private static final int DEFAULT_CHUNK_SIZE = 5000;
+    
+    private final Map<String, String> sqlCache = new ConcurrentHashMap<>();
+    
     @Getter
     private Properties props;
     
+    private int chunkSize;
+    
     @Override
     public void init(final Properties props) {
         this.props = props;
+        chunkSize = getChunkSize(props);
+    }
+    
+    private int getChunkSize(final Properties props) {
+        int result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY, 
DEFAULT_CHUNK_SIZE + ""));
+        if (result <= 0) {
+            log.warn("Invalid result={}, use default value", result);
+            return DEFAULT_CHUNK_SIZE;
+        }
+        return result;
     }
     
     @Override
-    public Iterable<DataConsistencyCalculatedResult> calculate(final 
DataConsistencyCalculateParameter parameter) {
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final 
DataConsistencyCalculateParameter parameter) {
         PipelineSQLBuilder sqlBuilder = 
PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        List<CalculatedItem> calculatedItems = 
parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, 
parameter, each)).collect(Collectors.toList());
-        return Collections.singletonList(new 
CalculatedResult(calculatedItems.get(0).getRecordsCount(), 
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+        PipelineColumnMetaData uniqueKey = parameter.getUniqueKey();
+        CalculatedResult previousCalculatedResult = (CalculatedResult) 
parameter.getPreviousCalculatedResult();
+        Object beginId;
+        if (null == previousCalculatedResult) {
+            beginId = getBeginIdFromUniqueKey(uniqueKey.getDataType());
+        } else {
+            beginId = previousCalculatedResult.getMaxUniqueKeyValue();
+        }
+        Object endId = getMaxUniqueKeyValue(sqlBuilder, parameter);
+        if (null == endId) {
+            return Optional.empty();
+        }
+        List<CalculatedItem> calculatedItems = 
parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, 
parameter, each, beginId, endId)).collect(Collectors.toList());
+        int recordsCount = calculatedItems.get(0).getRecordsCount();
+        return Optional.of(new CalculatedResult(endId, recordsCount, 
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+    }
+    
+    private Object getBeginIdFromUniqueKey(final int columnType) {
+        if (PipelineJdbcUtils.isStringColumn(columnType)) {
+            return "!";
+        } else {
+            return Integer.MIN_VALUE;
+        }
     }
     
-    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, 
final DataConsistencyCalculateParameter parameter, final String columnName) {
+    private Object getMaxUniqueKeyValue(final PipelineSQLBuilder sqlBuilder, 
final DataConsistencyCalculateParameter parameter) {
+        String schemaName = parameter.getSchemaName();
+        String logicTableName = parameter.getLogicTableName();
+        String cacheKeyPrefix = "uniqueKey-" + (null == 
parameter.getPreviousCalculatedResult() ? "first" : "later") + "-";
+        String cacheKey = cacheKeyPrefix + parameter.getDatabaseType() + "-" + 
(null != schemaName && 
DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.computeIfAbsent(cacheKey, s -> 
sqlBuilder.buildChunkedQueryUniqueKeySQL(schemaName, logicTableName, 
parameter.getUniqueKey().getName(),
+                null == parameter.getPreviousCalculatedResult()));
+        CalculatedResult previousCalculatedResult = (CalculatedResult) 
parameter.getPreviousCalculatedResult();
+        try (
+                Connection connection = 
parameter.getDataSource().getConnection();
+                PreparedStatement preparedStatement = 
setCurrentStatement(connection.prepareStatement(sql))) {
+            preparedStatement.setFetchSize(chunkSize);
+            if (null == previousCalculatedResult) {
+                preparedStatement.setInt(1, chunkSize);
+            } else {
+                preparedStatement.setObject(1, 
previousCalculatedResult.getMaxUniqueKeyValue());
+                preparedStatement.setInt(2, chunkSize);
+            }
+            Object maxUniqueKeyValue = null;
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    maxUniqueKeyValue = resultSet.getObject(1);
+                }
+            }

Review Comment:
   It's not efficient enough to get maximum value of unique key. Could we query 
`max(uniqueKey)` in SQL?



##########
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java:
##########
@@ -76,6 +76,11 @@ public String buildChunkedQuerySQL(final String schemaName, 
final String tableNa
         return "";
     }
     
+    @Override
+    public String buildChunkedQueryUniqueKeySQL(final String schemaName, final 
String tableName, final String uniqueKey, final boolean firstQuery) {
+        return String.format("SELECT %s FROM %s ORDER BY %s ASC LIMIT ?", 
uniqueKey, tableName, uniqueKey);
+    }

Review Comment:
   If it's not necessary to return real SQL, then we could remove it, or else 
we need to update it



##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java:
##########
@@ -138,6 +138,17 @@ default Optional<String> buildCreateSchemaSQL(String 
schemaName) {
      */
     String buildChunkedQuerySQL(String schemaName, String tableName, String 
uniqueKey, boolean firstQuery);
     
+    /**
+     * Build query unique key SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param uniqueKey unique key, it may be primary key, not null
+     * @param firstQuery first query
+     * @return query unique key SQL
+     */
+    String buildChunkedQueryUniqueKeySQL(String schemaName, String tableName, 
String uniqueKey, boolean firstQuery);

Review Comment:
   Method name could be improved, e.g. getMaxUniqueKeyValue



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -37,51 +40,137 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
  * CRC32 match data consistency calculate algorithm.
  */
 @Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends 
AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends 
AbstractStreamingDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = 
Collections.singletonList(new MySQLDatabaseType().getType());
     
+    private static final String CHUNK_SIZE_KEY = "chunk-size";
+    
+    private static final int DEFAULT_CHUNK_SIZE = 5000;
+    
+    private final Map<String, String> sqlCache = new ConcurrentHashMap<>();
+    
     @Getter
     private Properties props;
     
+    private int chunkSize;
+    
     @Override
     public void init(final Properties props) {
         this.props = props;
+        chunkSize = getChunkSize(props);
+    }
+    
+    private int getChunkSize(final Properties props) {
+        int result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY, 
DEFAULT_CHUNK_SIZE + ""));
+        if (result <= 0) {
+            log.warn("Invalid result={}, use default value", result);
+            return DEFAULT_CHUNK_SIZE;
+        }
+        return result;
     }
     
     @Override
-    public Iterable<DataConsistencyCalculatedResult> calculate(final 
DataConsistencyCalculateParameter parameter) {
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final 
DataConsistencyCalculateParameter parameter) {
         PipelineSQLBuilder sqlBuilder = 
PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        List<CalculatedItem> calculatedItems = 
parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, 
parameter, each)).collect(Collectors.toList());
-        return Collections.singletonList(new 
CalculatedResult(calculatedItems.get(0).getRecordsCount(), 
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+        PipelineColumnMetaData uniqueKey = parameter.getUniqueKey();
+        CalculatedResult previousCalculatedResult = (CalculatedResult) 
parameter.getPreviousCalculatedResult();
+        Object beginId;
+        if (null == previousCalculatedResult) {
+            beginId = getBeginIdFromUniqueKey(uniqueKey.getDataType());
+        } else {
+            beginId = previousCalculatedResult.getMaxUniqueKeyValue();
+        }
+        Object endId = getMaxUniqueKeyValue(sqlBuilder, parameter);
+        if (null == endId) {
+            return Optional.empty();
+        }
+        List<CalculatedItem> calculatedItems = 
parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, 
parameter, each, beginId, endId)).collect(Collectors.toList());
+        int recordsCount = calculatedItems.get(0).getRecordsCount();
+        return Optional.of(new CalculatedResult(endId, recordsCount, 
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+    }
+    
+    private Object getBeginIdFromUniqueKey(final int columnType) {
+        if (PipelineJdbcUtils.isStringColumn(columnType)) {
+            return "!";
+        } else {
+            return Integer.MIN_VALUE;
+        }
     }
     
-    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, 
final DataConsistencyCalculateParameter parameter, final String columnName) {
+    private Object getMaxUniqueKeyValue(final PipelineSQLBuilder sqlBuilder, 
final DataConsistencyCalculateParameter parameter) {
+        String schemaName = parameter.getSchemaName();
+        String logicTableName = parameter.getLogicTableName();
+        String cacheKeyPrefix = "uniqueKey-" + (null == 
parameter.getPreviousCalculatedResult() ? "first" : "later") + "-";
+        String cacheKey = cacheKeyPrefix + parameter.getDatabaseType() + "-" + 
(null != schemaName && 
DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.computeIfAbsent(cacheKey, s -> 
sqlBuilder.buildChunkedQueryUniqueKeySQL(schemaName, logicTableName, 
parameter.getUniqueKey().getName(),
+                null == parameter.getPreviousCalculatedResult()));
+        CalculatedResult previousCalculatedResult = (CalculatedResult) 
parameter.getPreviousCalculatedResult();
+        try (
+                Connection connection = 
parameter.getDataSource().getConnection();
+                PreparedStatement preparedStatement = 
setCurrentStatement(connection.prepareStatement(sql))) {
+            preparedStatement.setFetchSize(chunkSize);
+            if (null == previousCalculatedResult) {
+                preparedStatement.setInt(1, chunkSize);
+            } else {
+                preparedStatement.setObject(1, 
previousCalculatedResult.getMaxUniqueKeyValue());
+                preparedStatement.setInt(2, chunkSize);
+            }
+            Object maxUniqueKeyValue = null;
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                while (resultSet.next()) {
+                    maxUniqueKeyValue = resultSet.getObject(1);
+                }
+            }
+            return maxUniqueKeyValue;
+        } catch (final SQLException ex) {
+            log.error("get max unique key value failed", ex);
+            throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(logicTableName);
+        }
+    }
+    
+    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, 
final DataConsistencyCalculateParameter parameter, final String columnName, 
final Object beginId, final Object endId) {
         String logicTableName = parameter.getLogicTableName();
         String schemaName = parameter.getSchemaName();
-        Optional<String> sql = sqlBuilder.buildCRC32SQL(schemaName, 
logicTableName, columnName);
-        ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new 
UnsupportedCRC32DataConsistencyCalculateAlgorithmException(parameter.getDatabaseType()));
-        return calculateCRC32(parameter.getDataSource(), logicTableName, 
sql.get());
+        String cacheKey = "crc32-" + parameter.getDatabaseType() + "-" + (null 
!= schemaName && 
DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
+        String sql = sqlCache.get(cacheKey);
+        if (null == sql) {
+            Optional<String> optional = sqlBuilder.buildCRC32SQL(schemaName, 
logicTableName, columnName, parameter.getUniqueKey().getName());
+            ShardingSpherePreconditions.checkState(optional.isPresent(), () -> 
new 
UnsupportedCRC32DataConsistencyCalculateAlgorithmException(parameter.getDatabaseType()));
+            sql = optional.get();
+            sqlCache.put(cacheKey, sql);
+        }

Review Comment:
   It could be extracted as `getQuerySQL` method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to