This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b367f689f8 Refactor DATA_MATCH consistency check by one query (#24075)
7b367f689f8 is described below

commit 7b367f689f8aeaa2767eaa73c83a5d64adf2d48b
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Feb 9 17:32:23 2023 +0800

    Refactor DATA_MATCH consistency check by one query (#24075)
    
    * Remove unused fields
    
    * Refactor DATA_MATCH consistency check by one query
---
 .../DataConsistencyCalculateParameter.java         |  16 +--
 .../spi/sqlbuilder/PipelineSQLBuilder.java         |   4 +-
 ...SingleTableInventoryDataConsistencyChecker.java |  26 +++-
 ...StreamingDataConsistencyCalculateAlgorithm.java |   1 -
 ...DataMatchDataConsistencyCalculateAlgorithm.java | 144 ++++++++++++++-------
 .../sqlbuilder/AbstractPipelineSQLBuilder.java     |   7 +-
 .../data/pipeline/core/util/CloseUtil.java         |  45 +++++++
 .../fixture/FixturePipelineSQLBuilder.java         |   2 +-
 ...MatchDataConsistencyCalculateAlgorithmTest.java |  32 ++---
 9 files changed, 189 insertions(+), 88 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
index 04d9c757adf..9fb97eb3f3a 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.api.check.consistency;
 
-import com.google.common.collect.Range;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
@@ -59,20 +58,9 @@ public final class DataConsistencyCalculateParameter {
     private final PipelineColumnMetaData uniqueKey;
     
     /**
-     * Used for range query.
+     * Calculation context.
      */
-    private volatile Range<? extends Comparable<?>> uniqueKeyValueRange;
-    
-    /**
-     * Used for multiple records query.
-     * If it's configured, then it could be translated to SQL like "uniqueKey 
IN (value1,value2,value3)".
-     */
-    private volatile Collection<Object> uniqueKeyValues;
-    
-    /**
-     * Previous calculated result will be transferred to next call.
-     */
-    private volatile Object previousCalculatedResult;
+    private volatile AutoCloseable calculationContext;
     
     private final Object tableCheckPosition;
 }
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 8195e5eea11..5406f5cbc8f 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -130,7 +130,7 @@ public interface PipelineSQLBuilder extends TypedSPI {
     String buildCountSQL(String schemaName, String tableName);
     
     /**
-     * Build query SQL.
+     * Build query all ordering SQL.
      *
      * @param schemaName schema name
      * @param tableName table name
@@ -138,7 +138,7 @@ public interface PipelineSQLBuilder extends TypedSPI {
      * @param firstQuery first query
      * @return query SQL
      */
-    String buildChunkedQuerySQL(String schemaName, String tableName, String 
uniqueKey, boolean firstQuery);
+    String buildQueryAllOrderingSQL(String schemaName, String tableName, 
String uniqueKey, boolean firstQuery);
     
     /**
      * Build check empty SQL.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index 060421536fd..7f4461394ec 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -33,6 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtil;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -109,6 +110,23 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
                 columnNames, targetDatabaseType, sourceDatabaseType, 
uniqueKey, tableCheckPositions.get(targetTableName));
         Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = 
calculateAlgorithm.calculate(sourceParam).iterator();
         Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = 
calculateAlgorithm.calculate(targetParam).iterator();
+        try {
+            return check0(sourceCalculatedResults, targetCalculatedResults, 
executor);
+            // CHECKSTYLE:OFF
+        } catch (final RuntimeException ex) {
+            // CHECKSTYLE:ON
+            if (null != sourceParam.getCalculationContext()) {
+                CloseUtil.closeQuietly(sourceParam.getCalculationContext());
+            }
+            if (null != targetParam.getCalculationContext()) {
+                CloseUtil.closeQuietly(targetParam.getCalculationContext());
+            }
+            throw ex;
+        }
+    }
+    
+    private DataConsistencyCheckResult check0(final 
Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults, final 
Iterator<DataConsistencyCalculatedResult> targetCalculatedResults,
+                                              final ThreadPoolExecutor 
executor) {
         long sourceRecordsCount = 0;
         long targetRecordsCount = 0;
         boolean contentMatched = true;
@@ -128,10 +146,10 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
                 break;
             }
             if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-                tableCheckPositions.put(sourceTableName, 
sourceCalculatedResult.getMaxUniqueKeyValue().get());
+                
progressContext.getTableCheckPositions().put(sourceTable.getTableName().getOriginal(),
 sourceCalculatedResult.getMaxUniqueKeyValue().get());
             }
             if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-                tableCheckPositions.put(targetTableName, 
targetCalculatedResult.getMaxUniqueKeyValue().get());
+                
progressContext.getTableCheckPositions().put(targetTable.getTableName().getOriginal(),
 targetCalculatedResult.getMaxUniqueKeyValue().get());
             }
             progressContext.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
         }
@@ -146,8 +164,8 @@ public final class 
SingleTableInventoryDataConsistencyChecker {
     private DataConsistencyCalculateParameter buildParameter(final 
PipelineDataSourceWrapper sourceDataSource,
                                                              final String 
schemaName, final String tableName, final Collection<String> columnNames,
                                                              final String 
sourceDatabaseType, final String targetDatabaseType, final 
PipelineColumnMetaData uniqueKey,
-                                                             final Object 
tableCheckPositionValue) {
-        return new DataConsistencyCalculateParameter(sourceDataSource, 
schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType, 
uniqueKey, tableCheckPositionValue);
+                                                             final Object 
tableCheckPosition) {
+        return new DataConsistencyCalculateParameter(sourceDataSource, 
schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType, 
uniqueKey, tableCheckPosition);
     }
     
     private <T> T waitFuture(final Future<T> future) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
index e0600b895bb..37797fc8a57 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
@@ -78,7 +78,6 @@ public abstract class 
AbstractStreamingDataConsistencyCalculateAlgorithm extends
         public DataConsistencyCalculatedResult next() {
             calculateIfNecessary();
             Optional<DataConsistencyCalculatedResult> nextResult = 
this.nextResult;
-            param.setPreviousCalculatedResult(nextResult.orElse(null));
             this.nextResult = null;
             return nextResult.orElse(null);
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index f1e9c0f100b..7a1ee87528f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm
 import lombok.Getter;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -27,12 +28,14 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtil;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import java.math.BigDecimal;
@@ -45,11 +48,9 @@ import java.sql.SQLXML;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -68,10 +69,6 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
     
     private int chunkSize;
     
-    private final Map<String, String> firstSQLCache = new 
ConcurrentHashMap<>();
-    
-    private final Map<String, String> laterSQLCache = new 
ConcurrentHashMap<>();
-    
     @Override
     public void init(final Properties props) {
         chunkSize = getChunkSize(props);
@@ -88,46 +85,84 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
     
     @Override
     public Optional<DataConsistencyCalculatedResult> calculateChunk(final 
DataConsistencyCalculateParameter param) {
-        CalculatedResult previousCalculatedResult = (CalculatedResult) 
param.getPreviousCalculatedResult();
-        String sql = getQuerySQL(param);
-        try (
-                Connection connection = param.getDataSource().getConnection();
-                PreparedStatement preparedStatement = 
setCurrentStatement(connection.prepareStatement(sql))) {
-            preparedStatement.setFetchSize(chunkSize);
-            Object tableCheckPosition = param.getTableCheckPosition();
-            if (null == previousCalculatedResult) {
-                if (null == tableCheckPosition) {
-                    preparedStatement.setInt(1, chunkSize);
-                } else {
-                    preparedStatement.setObject(1, tableCheckPosition);
-                    preparedStatement.setInt(2, chunkSize);
-                }
-            } else {
-                preparedStatement.setObject(1, 
previousCalculatedResult.getMaxUniqueKeyValue().orElse(null));
-                preparedStatement.setInt(2, chunkSize);
-            }
+        CalculationContext calculationContext = 
getOrCreateCalculationContext(param);
+        try {
             Collection<Collection<Object>> records = new LinkedList<>();
             Object maxUniqueKeyValue = null;
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                ColumnValueReader columnValueReader = 
TypedSPILoader.getService(ColumnValueReader.class, param.getDatabaseType());
-                while (resultSet.next()) {
-                    if (isCanceling()) {
-                        throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), 
param.getLogicTableName());
-                    }
-                    ResultSetMetaData resultSetMetaData = 
resultSet.getMetaData();
-                    int columnCount = resultSetMetaData.getColumnCount();
-                    Collection<Object> record = new LinkedList<>();
-                    for (int columnIndex = 1; columnIndex <= columnCount; 
columnIndex++) {
-                        record.add(columnValueReader.readValue(resultSet, 
resultSetMetaData, columnIndex));
-                    }
-                    records.add(record);
-                    maxUniqueKeyValue = columnValueReader.readValue(resultSet, 
resultSetMetaData, param.getUniqueKey().getOrdinalPosition());
+            ColumnValueReader columnValueReader = 
TypedSPILoader.getService(ColumnValueReader.class, param.getDatabaseType());
+            ResultSet resultSet = calculationContext.getResultSet();
+            while (resultSet.next()) {
+                if (isCanceling()) {
+                    throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), 
param.getLogicTableName());
+                }
+                ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+                int columnCount = resultSetMetaData.getColumnCount();
+                Collection<Object> record = new LinkedList<>();
+                for (int columnIndex = 1; columnIndex <= columnCount; 
columnIndex++) {
+                    record.add(columnValueReader.readValue(resultSet, 
resultSetMetaData, columnIndex));
                 }
+                records.add(record);
+                maxUniqueKeyValue = columnValueReader.readValue(resultSet, 
resultSetMetaData, param.getUniqueKey().getOrdinalPosition());
+                if (records.size() == chunkSize) {
+                    break;
+                }
+            }
+            if (records.isEmpty()) {
+                calculationContext.close();
             }
             return records.isEmpty() ? Optional.empty() : Optional.of(new 
CalculatedResult(maxUniqueKeyValue, records.size(), records));
-        } catch (final SQLException ex) {
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            calculationContext.close();
+            if (ex instanceof PipelineSQLException) {
+                throw (PipelineSQLException) ex;
+            }
+            throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), 
param.getLogicTableName(), ex);
+        }
+    }
+    
+    private CalculationContext getOrCreateCalculationContext(final 
DataConsistencyCalculateParameter param) {
+        CalculationContext result = (CalculationContext) 
param.getCalculationContext();
+        if (null != result) {
+            return result;
+        }
+        try {
+            result = createCalculationContext(param);
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), 
param.getLogicTableName(), ex);
+        }
+        try {
+            fulfillCalculationContext(result, param);
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            result.close();
             throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), 
param.getLogicTableName(), ex);
         }
+        return result;
+    }
+    
+    private CalculationContext createCalculationContext(final 
DataConsistencyCalculateParameter param) throws SQLException {
+        Connection connection = param.getDataSource().getConnection();
+        CalculationContext result = new CalculationContext(connection);
+        param.setCalculationContext(result);
+        return result;
+    }
+    
+    private void fulfillCalculationContext(final CalculationContext 
calculationContext, final DataConsistencyCalculateParameter param) throws 
SQLException {
+        String sql = getQuerySQL(param);
+        PreparedStatement preparedStatement = 
setCurrentStatement(calculationContext.getConnection().prepareStatement(sql));
+        calculationContext.setPreparedStatement(preparedStatement);
+        preparedStatement.setFetchSize(chunkSize);
+        Object tableCheckPosition = param.getTableCheckPosition();
+        if (null != tableCheckPosition) {
+            preparedStatement.setObject(1, tableCheckPosition);
+        }
+        ResultSet resultSet = preparedStatement.executeQuery();
+        calculationContext.setResultSet(resultSet);
     }
     
     private String getQuerySQL(final DataConsistencyCalculateParameter param) {
@@ -135,13 +170,10 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
         String logicTableName = param.getLogicTableName();
         String schemaName = param.getSchemaName();
         String uniqueKey = param.getUniqueKey().getName();
-        String cacheKey = param.getDatabaseType() + "-" + (null != schemaName 
&& TypedSPILoader.getService(DatabaseType.class, 
param.getDatabaseType()).isSchemaAvailable()
-                ? schemaName + "." + logicTableName
-                : logicTableName);
-        if (null == param.getPreviousCalculatedResult() && null == 
param.getTableCheckPosition()) {
-            return firstSQLCache.computeIfAbsent(cacheKey, s -> 
sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, true));
+        if (null != param.getTableCheckPosition()) {
+            return sqlBuilder.buildQueryAllOrderingSQL(schemaName, 
logicTableName, uniqueKey, false);
         }
-        return laterSQLCache.computeIfAbsent(cacheKey, s -> 
sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, false));
+        return sqlBuilder.buildQueryAllOrderingSQL(schemaName, logicTableName, 
uniqueKey, true);
     }
     
     @Override
@@ -154,6 +186,26 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
         return SUPPORTED_DATABASE_TYPES;
     }
     
+    @RequiredArgsConstructor
+    @Getter
+    private static final class CalculationContext implements AutoCloseable {
+        
+        private final Connection connection;
+        
+        @Setter
+        private volatile PreparedStatement preparedStatement;
+        
+        @Setter
+        private volatile ResultSet resultSet;
+        
+        @Override
+        public void close() {
+            CloseUtil.closeQuietly(resultSet);
+            CloseUtil.closeQuietly(preparedStatement);
+            CloseUtil.closeQuietly(connection);
+        }
+    }
+    
     @RequiredArgsConstructor
     @Getter
     static final class CalculatedResult implements 
DataConsistencyCalculatedResult {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 991ac270e50..8928e401973 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
 
 import com.google.common.base.Strings;
-import lombok.NonNull;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
@@ -171,12 +170,12 @@ public abstract class AbstractPipelineSQLBuilder 
implements PipelineSQLBuilder {
     }
     
     @Override
-    public String buildChunkedQuerySQL(final String schemaName, final @NonNull 
String tableName, final @NonNull String uniqueKey, final boolean firstQuery) {
+    public String buildQueryAllOrderingSQL(final String schemaName, final 
String tableName, final String uniqueKey, final boolean firstQuery) {
         String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
         String quotedUniqueKey = quote(uniqueKey);
         return firstQuery
-                ? String.format("SELECT * FROM %s ORDER BY %s ASC LIMIT ?", 
qualifiedTableName, quotedUniqueKey)
-                : String.format("SELECT * FROM %s WHERE %s>? ORDER BY %s ASC 
LIMIT ?", qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
+                ? String.format("SELECT * FROM %s ORDER BY %s ASC", 
qualifiedTableName, quotedUniqueKey)
+                : String.format("SELECT * FROM %s WHERE %s>? ORDER BY %s ASC", 
qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/CloseUtil.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/CloseUtil.java
new file mode 100644
index 00000000000..1d4fe97f5be
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/CloseUtil.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+/**
+ * Close util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CloseUtil {
+    
+    /**
+     * Close quietly.
+     *
+     * @param closeable closeable
+     */
+    public static void closeQuietly(final AutoCloseable closeable) {
+        if (null == closeable) {
+            return;
+        }
+        try {
+            closeable.close();
+            // CHECKSTYLE:OFF
+        } catch (final Exception ignored) {
+            // CHECKSTYLE:ON
+        }
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index b33a83d70ac..3db2baad3f5 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -69,7 +69,7 @@ public final class FixturePipelineSQLBuilder implements 
PipelineSQLBuilder {
     }
     
     @Override
-    public String buildChunkedQuerySQL(final String schemaName, final String 
tableName, final String uniqueKey, final boolean firstQuery) {
+    public String buildQueryAllOrderingSQL(final String schemaName, final 
String tableName, final String uniqueKey, final boolean firstQuery) {
         return "";
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
index d41de94a18d..de667ecb34a 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
@@ -24,8 +24,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.DataMatchDataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.internal.configuration.plugins.Plugins;
 
@@ -42,19 +42,25 @@ import static org.junit.Assert.assertTrue;
 
 public final class DataMatchDataConsistencyCalculateAlgorithmTest {
     
-    private PipelineDataSourceWrapper source;
+    private static PipelineDataSourceWrapper source;
     
-    private PipelineDataSourceWrapper target;
+    private static PipelineDataSourceWrapper target;
     
-    @Before
-    public void setUp() throws Exception {
+    @BeforeClass
+    public static void setUp() throws Exception {
         source = new 
PipelineDataSourceWrapper(createHikariDataSource("source_ds"), new 
H2DatabaseType());
         createTableAndInitData(source, "t_order_copy");
         target = new 
PipelineDataSourceWrapper(createHikariDataSource("target_ds"), new 
H2DatabaseType());
         createTableAndInitData(target, "t_order");
     }
     
-    private HikariDataSource createHikariDataSource(final String databaseName) 
{
+    @AfterClass
+    public static void tearDown() throws Exception {
+        source.close();
+        target.close();
+    }
+    
+    private static HikariDataSource createHikariDataSource(final String 
databaseName) {
         HikariDataSource result = new HikariDataSource();
         
result.setJdbcUrl(String.format("jdbc:h2:mem:%s;DATABASE_TO_UPPER=false;MODE=MySQL",
 databaseName));
         result.setUsername("root");
@@ -66,7 +72,7 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithmTest {
         return result;
     }
     
-    private void createTableAndInitData(final PipelineDataSourceWrapper 
dataSource, final String tableName) throws SQLException {
+    private static void createTableAndInitData(final PipelineDataSourceWrapper 
dataSource, final String tableName) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
             String sql = String.format("CREATE TABLE %s (order_id INT NOT 
NULL, user_id INT NOT NULL, status VARCHAR(45) NULL, PRIMARY KEY (order_id))", 
tableName);
             connection.createStatement().execute(sql);
@@ -114,15 +120,9 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithmTest {
         assertThat(sourceCalculateResult.get(), 
is(targetCalculateResult.get()));
     }
     
-    private DataConsistencyCalculateParameter generateParameter(final 
PipelineDataSourceWrapper dataSource, final String logicTableName, final Object 
dataCheckPositionValue) {
+    private DataConsistencyCalculateParameter generateParameter(final 
PipelineDataSourceWrapper dataSource, final String logicTableName, final Object 
dataCheckPosition) {
         PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1, 
"order_id", Types.INTEGER, "integer", false, true, true);
         return new DataConsistencyCalculateParameter(dataSource, null, 
logicTableName, Collections.emptyList(),
-                "MySQL", "MySQL", uniqueKey, dataCheckPositionValue);
-    }
-    
-    @After
-    public void tearDown() throws Exception {
-        source.close();
-        target.close();
+                "MySQL", "MySQL", uniqueKey, dataCheckPosition);
     }
 }

Reply via email to