azexcy commented on code in PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#discussion_r994593114
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -37,51 +40,137 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* CRC32 match data consistency calculate algorithm.
*/
@Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends
AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends
AbstractStreamingDataConsistencyCalculateAlgorithm {
private static final Collection<String> SUPPORTED_DATABASE_TYPES =
Collections.singletonList(new MySQLDatabaseType().getType());
+ private static final String CHUNK_SIZE_KEY = "chunk-size";
+
+ private static final int DEFAULT_CHUNK_SIZE = 5000;
+
+ private final Map<String, String> sqlCache = new ConcurrentHashMap<>();
+
@Getter
private Properties props;
+ private int chunkSize;
+
@Override
public void init(final Properties props) {
this.props = props;
+ chunkSize = getChunkSize(props);
+ }
+
+ private int getChunkSize(final Properties props) {
+ int result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY,
DEFAULT_CHUNK_SIZE + ""));
+ if (result <= 0) {
+ log.warn("Invalid result={}, use default value", result);
+ return DEFAULT_CHUNK_SIZE;
+ }
+ return result;
}
@Override
- public Iterable<DataConsistencyCalculatedResult> calculate(final
DataConsistencyCalculateParameter parameter) {
+ protected Optional<DataConsistencyCalculatedResult> calculateChunk(final
DataConsistencyCalculateParameter parameter) {
PipelineSQLBuilder sqlBuilder =
PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
- List<CalculatedItem> calculatedItems =
parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder,
parameter, each)).collect(Collectors.toList());
- return Collections.singletonList(new
CalculatedResult(calculatedItems.get(0).getRecordsCount(),
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+ PipelineColumnMetaData uniqueKey = parameter.getUniqueKey();
+ CalculatedResult previousCalculatedResult = (CalculatedResult)
parameter.getPreviousCalculatedResult();
+ Object beginId;
+ if (null == previousCalculatedResult) {
+ beginId = getBeginIdFromUniqueKey(uniqueKey.getDataType());
+ } else {
+ beginId = previousCalculatedResult.getMaxUniqueKeyValue();
+ }
+ Object endId = getMaxUniqueKeyValue(sqlBuilder, parameter);
+ if (null == endId) {
+ return Optional.empty();
+ }
+ List<CalculatedItem> calculatedItems =
parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder,
parameter, each, beginId, endId)).collect(Collectors.toList());
+ int recordsCount = calculatedItems.get(0).getRecordsCount();
+ return Optional.of(new CalculatedResult(endId, recordsCount,
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+ }
+
+ private Object getBeginIdFromUniqueKey(final int columnType) {
+ if (PipelineJdbcUtils.isStringColumn(columnType)) {
+ return "!";
+ } else {
+ return Integer.MIN_VALUE;
+ }
}
- private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder,
final DataConsistencyCalculateParameter parameter, final String columnName) {
+ private Object getMaxUniqueKeyValue(final PipelineSQLBuilder sqlBuilder,
final DataConsistencyCalculateParameter parameter) {
+ String schemaName = parameter.getSchemaName();
+ String logicTableName = parameter.getLogicTableName();
+ String cacheKeyPrefix = "uniqueKey-" + (null ==
parameter.getPreviousCalculatedResult() ? "first" : "later") + "-";
+ String cacheKey = cacheKeyPrefix + parameter.getDatabaseType() + "-" +
(null != schemaName &&
DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+ ? schemaName + "." + logicTableName
+ : logicTableName);
+ String sql = sqlCache.computeIfAbsent(cacheKey, s ->
sqlBuilder.buildChunkedQueryUniqueKeySQL(schemaName, logicTableName,
parameter.getUniqueKey().getName(),
+ null == parameter.getPreviousCalculatedResult()));
+ CalculatedResult previousCalculatedResult = (CalculatedResult)
parameter.getPreviousCalculatedResult();
+ try (
+ Connection connection =
parameter.getDataSource().getConnection();
+ PreparedStatement preparedStatement =
setCurrentStatement(connection.prepareStatement(sql))) {
+ preparedStatement.setFetchSize(chunkSize);
+ if (null == previousCalculatedResult) {
+ preparedStatement.setInt(1, chunkSize);
+ } else {
+ preparedStatement.setObject(1,
previousCalculatedResult.getMaxUniqueKeyValue());
+ preparedStatement.setInt(2, chunkSize);
+ }
+ Object maxUniqueKeyValue = null;
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ while (resultSet.next()) {
+ maxUniqueKeyValue = resultSet.getObject(1);
+ }
+ }
Review Comment:
`SELECT max(id) FROM t_order ORDER BY id ASC LIMIT 1` the limit not effiect
when use `max()`,
and sub query `SELECT MAX(id) FROM (SELECT id FROM t_order ORDER BY id ASC
LIMIT 1) t;` is different now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]