This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4488af3ad11 Review and refactor data consistency check (#28058)
4488af3ad11 is described below

commit 4488af3ad11356237e58559f20b77fb643e22a63
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Aug 13 17:18:44 2023 +0800

    Review and refactor data consistency check (#28058)
---
 .../DataConsistencyCheckUtils.java                 |   1 +
 ...RecordSingleTableInventoryCalculatedResult.java |   3 -
 .../result/TableDataConsistencyCheckResult.java    |  10 +-
 .../TableDataConsistencyCountCheckResult.java      |  15 +--
 .../yaml/YamlTableDataConsistencyCheckResult.java  |  24 ++++-
 ...YamlTableDataConsistencyCheckResultSwapper.java |   1 -
 .../table/MatchingTableDataConsistencyChecker.java |  28 ++---
 .../RecordSingleTableInventoryCalculator.java      |  12 ++-
 .../ConsistencyCheckDataBuilder.java               | 115 +++++++++++++++++++++
 ...rdSingleTableInventoryCalculatedResultTest.java |  78 +-------------
 .../h2/sqlbuilder}/H2PipelineSQLBuilder.java       |   2 +-
 ...peline.spi.sqlbuilder.DialectPipelineSQLBuilder |   2 +-
 .../RecordSingleTableInventoryCalculatorTest.java  |  69 +++++--------
 13 files changed, 210 insertions(+), 150 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
index 60bc9f44741..adb79c32554 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
@@ -67,6 +67,7 @@ public final class DataConsistencyCheckUtils {
     
     @SneakyThrows(SQLException.class)
     private static boolean isMatched(final EqualsBuilder equalsBuilder, final 
Object thisColumnValue, final Object thatColumnValue) {
+        equalsBuilder.reset();
         if (thisColumnValue instanceof SQLXML && thatColumnValue instanceof 
SQLXML) {
             return ((SQLXML) thisColumnValue).getString().equals(((SQLXML) 
thatColumnValue).getString());
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
index 1253503bf12..1db10f4352b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.consistencycheck.result;
 
-import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -41,7 +40,6 @@ public final class RecordSingleTableInventoryCalculatedResult 
implements SingleT
     
     private final int recordsCount;
     
-    @Getter(AccessLevel.NONE)
     private final List<Map<String, Object>> records;
     
     public RecordSingleTableInventoryCalculatedResult(final Object 
maxUniqueKeyValue, final List<Map<String, Object>> records) {
@@ -77,7 +75,6 @@ public final class RecordSingleTableInventoryCalculatedResult 
implements SingleT
         Iterator<Map<String, Object>> thisRecordsIterator = records.iterator();
         Iterator<Map<String, Object>> thatRecordsIterator = 
that.records.iterator();
         while (thisRecordsIterator.hasNext() && thatRecordsIterator.hasNext()) 
{
-            equalsBuilder.reset();
             Map<String, Object> thisRecord = thisRecordsIterator.next();
             Map<String, Object> thatRecord = thatRecordsIterator.next();
             if (thisRecord.size() != thatRecord.size()) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
index c768c3c88b5..7eec6445fbc 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
@@ -27,22 +27,22 @@ import lombok.ToString;
 @ToString
 public final class TableDataConsistencyCheckResult {
     
-    private final TableDataConsistencyCheckIgnoredType ignoredType;
-    
     private final TableDataConsistencyCountCheckResult countCheckResult;
     
     private final TableDataConsistencyContentCheckResult contentCheckResult;
     
+    private final TableDataConsistencyCheckIgnoredType ignoredType;
+    
     public TableDataConsistencyCheckResult(final 
TableDataConsistencyCountCheckResult countCheckResult, final 
TableDataConsistencyContentCheckResult contentCheckResult) {
-        ignoredType = null;
         this.countCheckResult = countCheckResult;
         this.contentCheckResult = contentCheckResult;
+        ignoredType = null;
     }
     
     public TableDataConsistencyCheckResult(final 
TableDataConsistencyCheckIgnoredType ignoredType) {
+        countCheckResult = new TableDataConsistencyCountCheckResult(-1, -1);
+        contentCheckResult = new TableDataConsistencyContentCheckResult(false);
         this.ignoredType = ignoredType;
-        countCheckResult = null;
-        contentCheckResult = null;
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
index 81dd4252319..aaf2b5ef02c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
@@ -18,11 +18,13 @@
 package org.apache.shardingsphere.data.pipeline.core.consistencycheck.result;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 
 /**
  * Table data consistency count check result.
  */
+@RequiredArgsConstructor
 @Getter
 @ToString
 public final class TableDataConsistencyCountCheckResult {
@@ -31,11 +33,12 @@ public final class TableDataConsistencyCountCheckResult {
     
     private final long targetRecordsCount;
     
-    private final boolean matched;
-    
-    public TableDataConsistencyCountCheckResult(final long sourceRecordsCount, 
final long targetRecordsCount) {
-        this.sourceRecordsCount = sourceRecordsCount;
-        this.targetRecordsCount = targetRecordsCount;
-        matched = sourceRecordsCount == targetRecordsCount;
+    /**
+     * Is matched.
+     *
+     * @return true if records count equals between source and target
+     */
+    public boolean isMatched() {
+        return sourceRecordsCount == targetRecordsCount && sourceRecordsCount 
>= 0;
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
index 9c14939bd39..6dc803ec3ed 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
@@ -17,13 +17,16 @@
 
 package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml;
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.NoArgsConstructor;
 import lombok.Setter;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
 /**
  * Yaml table data consistency check result config.
  */
+@NoArgsConstructor
 @Getter
 @Setter
 public final class YamlTableDataConsistencyCheckResult implements 
YamlConfiguration {
@@ -34,6 +37,11 @@ public final class YamlTableDataConsistencyCheckResult 
implements YamlConfigurat
     
     private String ignoredType;
     
+    public YamlTableDataConsistencyCheckResult(final 
YamlTableDataConsistencyCountCheckResult countCheckResult, final 
YamlTableDataConsistencyContentCheckResult contentCheckResult) {
+        this.countCheckResult = countCheckResult;
+        this.contentCheckResult = contentCheckResult;
+    }
+    
     /**
      * YAML table data consistency count result.
      */
@@ -45,12 +53,26 @@ public final class YamlTableDataConsistencyCheckResult 
implements YamlConfigurat
         
         private long targetRecordsCount;
         
-        private boolean matched;
+        /**
+         * Add records count.
+         *
+         * @param delta delta count
+         * @param onSource add on source or target
+         */
+        public void addRecordsCount(final long delta, final boolean onSource) {
+            if (onSource) {
+                sourceRecordsCount += delta;
+            } else {
+                targetRecordsCount += delta;
+            }
+        }
     }
     
     /**
      * YAML table data consistency content result.
      */
+    @NoArgsConstructor
+    @AllArgsConstructor
     @Getter
     @Setter
     public static class YamlTableDataConsistencyContentCheckResult implements 
YamlConfiguration {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
index 619eeaa95da..4003f101813 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
@@ -42,7 +42,6 @@ public final class YamlTableDataConsistencyCheckResultSwapper 
implements YamlCon
         YamlTableDataConsistencyCountCheckResult countCheckResult = new 
YamlTableDataConsistencyCountCheckResult();
         
countCheckResult.setSourceRecordsCount(data.getCountCheckResult().getSourceRecordsCount());
         
countCheckResult.setTargetRecordsCount(data.getCountCheckResult().getTargetRecordsCount());
-        countCheckResult.setMatched(data.getContentCheckResult().isMatched());
         result.setCountCheckResult(countCheckResult);
         YamlTableDataConsistencyContentCheckResult contentCheckResult = new 
YamlTableDataConsistencyContentCheckResult();
         
contentCheckResult.setMatched(data.getContentCheckResult().isMatched());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
index cb388404dc4..b88f7bf2d17 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
@@ -23,8 +23,10 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult.YamlTableDataConsistencyContentCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult.YamlTableDataConsistencyCountCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
@@ -82,19 +84,17 @@ public abstract class MatchingTableDataConsistencyChecker 
implements TableDataCo
     private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final 
Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults,
                                                                           
final Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults,
                                                                           
final TableDataConsistencyCheckParameter param, final ThreadPoolExecutor 
executor) {
-        long sourceRecordsCount = 0;
-        long targetRecordsCount = 0;
-        boolean contentMatched = true;
+        YamlTableDataConsistencyCheckResult checkResult = new 
YamlTableDataConsistencyCheckResult(new 
YamlTableDataConsistencyCountCheckResult(), new 
YamlTableDataConsistencyContentCheckResult(true));
         while (sourceCalculatedResults.hasNext() && 
targetCalculatedResults.hasNext()) {
             if (null != param.getReadRateLimitAlgorithm()) {
                 
param.getReadRateLimitAlgorithm().intercept(JobOperationType.SELECT, 1);
             }
             SingleTableInventoryCalculatedResult sourceCalculatedResult = 
waitFuture(executor.submit(sourceCalculatedResults::next));
             SingleTableInventoryCalculatedResult targetCalculatedResult = 
waitFuture(executor.submit(targetCalculatedResults::next));
-            sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
-            targetRecordsCount += targetCalculatedResult.getRecordsCount();
-            contentMatched = Objects.equals(sourceCalculatedResult, 
targetCalculatedResult);
-            if (!contentMatched) {
+            
checkResult.getCountCheckResult().addRecordsCount(sourceCalculatedResult.getRecordsCount(),
 true);
+            
checkResult.getCountCheckResult().addRecordsCount(targetCalculatedResult.getRecordsCount(),
 false);
+            if (!Objects.equals(sourceCalculatedResult, 
targetCalculatedResult)) {
+                checkResult.getContentCheckResult().setMatched(false);
                 log.info("content matched false, jobId={}, sourceTable={}, 
targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(), 
param.getTargetTable(), param.getUniqueKeys());
                 break;
             }
@@ -108,12 +108,16 @@ public abstract class MatchingTableDataConsistencyChecker 
implements TableDataCo
         }
         if (sourceCalculatedResults.hasNext()) {
             // TODO Refactor SingleTableInventoryCalculatedResult to represent 
inaccurate number
-            return new TableDataConsistencyCheckResult(new 
TableDataConsistencyCountCheckResult(sourceRecordsCount + 1, 
targetRecordsCount), new TableDataConsistencyContentCheckResult(false));
+            checkResult.getCountCheckResult().addRecordsCount(1, true);
+            checkResult.getContentCheckResult().setMatched(false);
+            return new 
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
         }
         if (targetCalculatedResults.hasNext()) {
-            return new TableDataConsistencyCheckResult(new 
TableDataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount + 
1), new TableDataConsistencyContentCheckResult(false));
+            checkResult.getCountCheckResult().addRecordsCount(1, false);
+            checkResult.getContentCheckResult().setMatched(false);
+            return new 
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
         }
-        return new TableDataConsistencyCheckResult(new 
TableDataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), 
new TableDataConsistencyContentCheckResult(contentMatched));
+        return new 
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
     }
     
     // TODO use digest (crc32, murmurhash)
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index a3d6b811f3f..abcbc0e2143 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -122,10 +122,7 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
             preparedStatement.setFetchSize(chunkSize);
         }
         calculationContext.setPreparedStatement(preparedStatement);
-        Object tableCheckPosition = param.getTableCheckPosition();
-        if (null != tableCheckPosition) {
-            preparedStatement.setObject(1, tableCheckPosition);
-        }
+        setParameters(preparedStatement, param);
         ResultSet resultSet = preparedStatement.executeQuery();
         calculationContext.setResultSet(resultSet);
     }
@@ -140,6 +137,13 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         return 
pipelineSQLBuilder.buildQueryAllOrderingSQL(param.getSchemaName(), 
param.getLogicTableName(), columnNames, param.getFirstUniqueKey().getName(), 
firstQuery);
     }
     
+    private void setParameters(final PreparedStatement preparedStatement, 
final SingleTableInventoryCalculateParameter param) throws SQLException {
+        Object tableCheckPosition = param.getTableCheckPosition();
+        if (null != tableCheckPosition) {
+            preparedStatement.setObject(1, tableCheckPosition);
+        }
+    }
+    
     @RequiredArgsConstructor
     private static final class CalculationContext implements AutoCloseable {
         
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
new file mode 100644
index 00000000000..9284bdff825
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.consistencycheck;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public final class ConsistencyCheckDataBuilder {
+    
+    /**
+     * Build fixed full type record.
+     *
+     * @param id id
+     * @return built record
+     */
+    public static Map<String, Object> buildFixedFullTypeRecord(final int id) {
+        Map<String, Object> result = new LinkedHashMap<>();
+        result.put("id", id);
+        result.put("c_bool", true);
+        result.put("c_int1", Byte.MAX_VALUE);
+        result.put("c_int2", Short.MAX_VALUE);
+        result.put("c_int4", Integer.MAX_VALUE);
+        result.put("c_int8", Long.MAX_VALUE);
+        result.put("c_float", 1.23F);
+        result.put("c_double", 2.3456D);
+        result.put("c_decimal", BigDecimal.valueOf(1.23456789D));
+        result.put("c_varchar", "ok");
+        result.put("c_time", new Time(123456789L));
+        result.put("c_date", new Date(123456789L));
+        result.put("c_timestamp", new Timestamp(123456789L));
+        result.put("c_array", new int[]{1, 2, 3});
+        result.put("c_blob", null);
+        return result;
+    }
+    
+    /**
+     * Modify column value randomly.
+     *
+     * @param record record
+     * @param key which key will be modified
+     * @return original record
+     */
+    public static Map<String, Object> modifyColumnValueRandomly(final 
Map<String, Object> record, final String key) {
+        Object value = record.get(key);
+        record.put(key, getModifiedValue(value));
+        return record;
+    }
+    
+    private static Object getModifiedValue(final Object value) {
+        if (null == value) {
+            return new Object();
+        }
+        if (value instanceof Boolean) {
+            return !((Boolean) value);
+        }
+        if (value instanceof Byte) {
+            return (byte) ((Byte) value - 1);
+        }
+        if (value instanceof Short) {
+            return (short) ((Short) value - 1);
+        }
+        if (value instanceof Integer) {
+            return (Integer) value - 1;
+        }
+        if (value instanceof Long) {
+            return (Long) value - 1L;
+        }
+        if (value instanceof Float) {
+            return (Float) value - 1F;
+        }
+        if (value instanceof Double) {
+            return (Double) value - 1D;
+        }
+        if (value instanceof BigDecimal) {
+            return ((BigDecimal) value).subtract(BigDecimal.ONE);
+        }
+        if (value instanceof String) {
+            return value + "-";
+        }
+        if (value instanceof Time) {
+            return new Time(((Time) value).getTime() - 1);
+        }
+        if (value instanceof Date) {
+            return new Date(((Date) value).getTime() - 1);
+        }
+        if (value instanceof Timestamp) {
+            return new Timestamp(((Timestamp) value).getTime() - 1);
+        }
+        if (value instanceof int[]) {
+            int[] result = ((int[]) value).clone();
+            result[0] = result[0] - 1;
+            return result;
+        }
+        return value;
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResultTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResultTest.java
index dc2b86e2f81..7603b8de762 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResultTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResultTest.java
@@ -17,16 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.core.consistencycheck.result;
 
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckDataBuilder;
 import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
 import java.math.RoundingMode;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -49,25 +46,6 @@ class RecordSingleTableInventoryCalculatedResultTest {
         assertThat(actual, is(expected));
     }
     
-    private Map<String, Object> buildFixedFullTypeRecord() {
-        Map<String, Object> result = new LinkedHashMap<>();
-        result.put("c_bool", true);
-        result.put("c_int1", Byte.MAX_VALUE);
-        result.put("c_int2", Short.MAX_VALUE);
-        result.put("c_int4", Integer.MAX_VALUE);
-        result.put("c_int8", Long.MAX_VALUE);
-        result.put("c_float", 1.23F);
-        result.put("c_double", 2.3456D);
-        result.put("c_decimal", BigDecimal.valueOf(1.23456789D));
-        result.put("c_varchar", "ok");
-        result.put("c_time", new Time(123456789L));
-        result.put("c_date", new Date(123456789L));
-        result.put("c_timestamp", new Timestamp(123456789L));
-        result.put("c_array", new int[]{1, 2, 3});
-        result.put("c_blob", null);
-        return result;
-    }
-    
     @Test
     void assertFullTypeRecordsEqualsWithDifferentDecimalScale() {
         RecordSingleTableInventoryCalculatedResult expected = new 
RecordSingleTableInventoryCalculatedResult(1000, 
Collections.singletonList(buildFixedFullTypeRecord()));
@@ -107,57 +85,11 @@ class RecordSingleTableInventoryCalculatedResultTest {
         });
     }
     
-    private Map<String, Object> modifyColumnValueRandomly(final Map<String, 
Object> record, final String key) {
-        Object value = record.get(key);
-        record.put(key, getModifiedValue(value));
-        return record;
+    private Map<String, Object> buildFixedFullTypeRecord() {
+        return ConsistencyCheckDataBuilder.buildFixedFullTypeRecord(1);
     }
     
-    private Object getModifiedValue(final Object value) {
-        if (null == value) {
-            return new Object();
-        }
-        if (value instanceof Boolean) {
-            return !((Boolean) value);
-        }
-        if (value instanceof Byte) {
-            return (Byte) value - 1;
-        }
-        if (value instanceof Short) {
-            return (Short) value - 1;
-        }
-        if (value instanceof Integer) {
-            return (Integer) value - 1;
-        }
-        if (value instanceof Long) {
-            return (Long) value - 1;
-        }
-        if (value instanceof Float) {
-            return (Float) value - 1;
-        }
-        if (value instanceof Double) {
-            return (Double) value - 1;
-        }
-        if (value instanceof BigDecimal) {
-            return ((BigDecimal) value).subtract(BigDecimal.ONE);
-        }
-        if (value instanceof String) {
-            return value + "-";
-        }
-        if (value instanceof Time) {
-            return new Time(((Time) value).getTime() - 1);
-        }
-        if (value instanceof Date) {
-            return new Date(((Date) value).getTime() - 1);
-        }
-        if (value instanceof Timestamp) {
-            return new Timestamp(((Timestamp) value).getTime() - 1);
-        }
-        if (value instanceof int[]) {
-            int[] result = ((int[]) value).clone();
-            result[0] = result[0] - 1;
-            return result;
-        }
-        return value;
+    private Map<String, Object> modifyColumnValueRandomly(final Map<String, 
Object> record, final String key) {
+        return ConsistencyCheckDataBuilder.modifyColumnValueRandomly(record, 
key);
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/sqlbuilder/H2PipelineSQLBuilder.java
similarity index 94%
rename from 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
rename to 
kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/sqlbuilder/H2PipelineSQLBuilder.java
index e50a85f8c89..6025f61e06b 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/sqlbuilder/H2PipelineSQLBuilder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.h2.sqlbuilder;
 
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
 
diff --git 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
 
b/kernel/data-pipeline/dialect/h2/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
similarity index 90%
rename from 
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
rename to 
kernel/data-pipeline/dialect/h2/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
index 582509d7fc4..525cc3bb507 100644
--- 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
+++ 
b/kernel/data-pipeline/dialect/h2/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.test.it.data.pipeline.core.fixture.H2PipelineSQLBuilder
+org.apache.shardingsphere.data.pipeline.h2.sqlbuilder.H2PipelineSQLBuilder
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
index 14c470df844..6b4c14e1f71 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
@@ -18,6 +18,7 @@
 package 
org.apache.shardingsphere.test.it.data.pipeline.core.consistencycheck.table.calculator;
 
 import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
@@ -29,7 +30,6 @@ import 
org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.mockito.internal.configuration.plugins.Plugins;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -45,22 +45,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class RecordSingleTableInventoryCalculatorTest {
     
-    private static PipelineDataSourceWrapper source;
-    
-    private static PipelineDataSourceWrapper target;
+    private static PipelineDataSourceWrapper dataSource;
     
     @BeforeAll
     static void setUp() throws Exception {
-        source = new 
PipelineDataSourceWrapper(createHikariDataSource("source_ds"), 
TypedSPILoader.getService(DatabaseType.class, "H2"));
-        createTableAndInitData(source, "t_order_copy");
-        target = new 
PipelineDataSourceWrapper(createHikariDataSource("target_ds"), 
TypedSPILoader.getService(DatabaseType.class, "H2"));
-        createTableAndInitData(target, "t_order");
+        dataSource = new 
PipelineDataSourceWrapper(createHikariDataSource("calc_" + 
RandomStringUtils.randomAlphanumeric(9)), 
TypedSPILoader.getService(DatabaseType.class, "H2"));
+        createTableAndInitData(dataSource);
     }
     
     @AfterAll
     static void tearDown() throws Exception {
-        source.close();
-        target.close();
+        dataSource.close();
     }
     
     private static HikariDataSource createHikariDataSource(final String 
databaseName) {
@@ -75,11 +70,11 @@ class RecordSingleTableInventoryCalculatorTest {
         return result;
     }
     
-    private static void createTableAndInitData(final PipelineDataSourceWrapper 
dataSource, final String tableName) throws SQLException {
+    private static void createTableAndInitData(final PipelineDataSourceWrapper 
dataSource) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            String sql = String.format("CREATE TABLE %s (order_id INT NOT 
NULL, user_id INT NOT NULL, status VARCHAR(45) NULL, PRIMARY KEY (order_id))", 
tableName);
+            String sql = "CREATE TABLE t_order (order_id INT PRIMARY KEY, 
user_id INT NOT NULL, status VARCHAR(12))";
             connection.createStatement().execute(sql);
-            PreparedStatement preparedStatement = 
connection.prepareStatement(String.format("INSERT INTO %s (order_id, user_id, 
status) VALUES (?, ?, ?)", tableName));
+            PreparedStatement preparedStatement = 
connection.prepareStatement("INSERT INTO t_order (order_id, user_id, status) 
VALUES (?, ?, ?)");
             for (int i = 0; i < 10; i++) {
                 preparedStatement.setInt(1, i + 1);
                 preparedStatement.setInt(2, i + 1);
@@ -90,41 +85,29 @@ class RecordSingleTableInventoryCalculatorTest {
     }
     
     @Test
-    void assertCalculateFromBegin() throws ReflectiveOperationException {
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(1000);
-        
Plugins.getMemberAccessor().set(RecordSingleTableInventoryCalculator.class.getDeclaredField("chunkSize"),
 calculator, 5);
-        SingleTableInventoryCalculateParameter sourceParam = 
generateParameter(source, "t_order_copy", 0);
-        Optional<SingleTableInventoryCalculatedResult> sourceCalculateResult = 
calculator.calculateChunk(sourceParam);
-        SingleTableInventoryCalculateParameter targetParam = 
generateParameter(target, "t_order", 0);
-        Optional<SingleTableInventoryCalculatedResult> targetCalculateResult = 
calculator.calculateChunk(targetParam);
-        assertTrue(sourceCalculateResult.isPresent());
-        assertTrue(targetCalculateResult.isPresent());
-        
assertTrue(sourceCalculateResult.get().getMaxUniqueKeyValue().isPresent());
-        
assertTrue(targetCalculateResult.get().getMaxUniqueKeyValue().isPresent());
-        assertThat(sourceCalculateResult.get().getMaxUniqueKeyValue().get(), 
is(targetCalculateResult.get().getMaxUniqueKeyValue().get()));
-        assertThat(targetCalculateResult.get().getMaxUniqueKeyValue().get(), 
is(5L));
-        assertThat(sourceCalculateResult.get(), 
is(targetCalculateResult.get()));
+    void assertCalculateOfAllQueryFromBegin() {
+        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(5);
+        SingleTableInventoryCalculateParameter param = 
generateParameter(dataSource, 0);
+        Optional<SingleTableInventoryCalculatedResult> calculateResult = 
calculator.calculateChunk(param);
+        assertTrue(calculateResult.isPresent());
+        SingleTableInventoryCalculatedResult actual = calculateResult.get();
+        assertTrue(actual.getMaxUniqueKeyValue().isPresent());
+        assertThat(actual.getMaxUniqueKeyValue().get(), is(5L));
     }
     
     @Test
-    void assertCalculateFromMiddle() throws ReflectiveOperationException {
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(1000);
-        
Plugins.getMemberAccessor().set(RecordSingleTableInventoryCalculator.class.getDeclaredField("chunkSize"),
 calculator, 5);
-        SingleTableInventoryCalculateParameter sourceParam = 
generateParameter(source, "t_order_copy", 5);
-        Optional<SingleTableInventoryCalculatedResult> sourceCalculateResult = 
calculator.calculateChunk(sourceParam);
-        SingleTableInventoryCalculateParameter targetParam = 
generateParameter(target, "t_order", 5);
-        Optional<SingleTableInventoryCalculatedResult> targetCalculateResult = 
calculator.calculateChunk(targetParam);
-        assertTrue(sourceCalculateResult.isPresent());
-        assertTrue(targetCalculateResult.isPresent());
-        
assertTrue(sourceCalculateResult.get().getMaxUniqueKeyValue().isPresent());
-        
assertTrue(targetCalculateResult.get().getMaxUniqueKeyValue().isPresent());
-        assertThat(sourceCalculateResult.get().getMaxUniqueKeyValue().get(), 
is(targetCalculateResult.get().getMaxUniqueKeyValue().get()));
-        assertThat(targetCalculateResult.get().getMaxUniqueKeyValue().get(), 
is(10L));
-        assertThat(sourceCalculateResult.get(), 
is(targetCalculateResult.get()));
+    void assertCalculateOfAllQueryFromMiddle() {
+        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(5);
+        SingleTableInventoryCalculateParameter param = 
generateParameter(dataSource, 5);
+        Optional<SingleTableInventoryCalculatedResult> calculateResult = 
calculator.calculateChunk(param);
+        assertTrue(calculateResult.isPresent());
+        SingleTableInventoryCalculatedResult actual = calculateResult.get();
+        assertTrue(actual.getMaxUniqueKeyValue().isPresent());
+        assertThat(actual.getMaxUniqueKeyValue().get(), is(10L));
     }
     
-    private SingleTableInventoryCalculateParameter generateParameter(final 
PipelineDataSourceWrapper dataSource, final String logicTableName, final Object 
dataCheckPosition) {
+    private SingleTableInventoryCalculateParameter generateParameter(final 
PipelineDataSourceWrapper dataSource, final Object dataCheckPosition) {
         List<PipelineColumnMetaData> uniqueKeys = 
Collections.singletonList(new PipelineColumnMetaData(1, "order_id", 
Types.INTEGER, "integer", false, true, true));
-        return new SingleTableInventoryCalculateParameter(dataSource, new 
SchemaTableName(null, logicTableName), Collections.emptyList(), uniqueKeys, 
dataCheckPosition);
+        return new SingleTableInventoryCalculateParameter(dataSource, new 
SchemaTableName(null, "t_order"), Collections.emptyList(), uniqueKeys, 
dataCheckPosition);
     }
 }


Reply via email to