This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fe989e2981e Extract AbstractRecordSingleTableInventoryCalculator 
(#36746)
fe989e2981e is described below

commit fe989e2981ec209fea307a12c5e275edd0214418
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Sep 29 11:43:10 2025 +0800

    Extract AbstractRecordSingleTableInventoryCalculator (#36746)
---
 .../DataConsistencyCheckUtils.java                 |   6 +-
 .../DataMatchTableDataConsistencyChecker.java      |   4 +-
 ...tractRecordSingleTableInventoryCalculator.java} | 109 +++++++++------------
 .../table/calculator/CalculationContext.java       |   7 +-
 .../RecordSingleTableInventoryCheckCalculator.java |  63 ++++++++++++
 ...rdSingleTableInventoryCheckCalculatorTest.java} |  30 +++---
 6 files changed, 136 insertions(+), 83 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
index afa37863d18..608dc579001 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
@@ -205,11 +205,11 @@ public final class DataConsistencyCheckUtils {
     /**
      * Get first unique key value.
      *
-     * @param rawRecord raw record
+     * @param record record
      * @param uniqueKey unique key
      * @return first unique key value
      */
-    public static Object getFirstUniqueKeyValue(final Map<String, Object> 
rawRecord, final @Nullable String uniqueKey) {
-        return rawRecord.isEmpty() || null == uniqueKey ? null : 
rawRecord.get(uniqueKey);
+    public static Object getFirstUniqueKeyValue(final Map<String, Object> 
record, final @Nullable String uniqueKey) {
+        return record.isEmpty() || null == uniqueKey ? null : 
record.get(uniqueKey);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
index 48016bcedb2..f4c0ff4898e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
@@ -21,7 +21,7 @@ import com.google.common.base.Strings;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckIgnoredType;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordSingleTableInventoryCalculator;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordSingleTableInventoryCheckCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.StreamingRangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
@@ -129,7 +129,7 @@ public final class DataMatchTableDataConsistencyChecker 
implements TableDataCons
         
         @Override
         protected 
SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult> 
buildSingleTableInventoryCalculator() {
-            return new RecordSingleTableInventoryCalculator(chunkSize, 
streamingRangeType);
+            return new RecordSingleTableInventoryCheckCalculator(chunkSize, 
streamingRangeType);
         }
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
similarity index 71%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
index 06f19c256fd..e369662f172 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
@@ -20,8 +20,6 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
 import lombok.RequiredArgsConstructor;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCheckUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
@@ -42,18 +40,19 @@ import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 
 /**
- * Record single table inventory calculator.
+ * Abstract record single table inventory calculator.
+ *
+ * @param <S> the type of result
+ * @param <C> the type of record
  */
 @HighFrequencyInvocation
 @RequiredArgsConstructor
-public final class RecordSingleTableInventoryCalculator extends 
AbstractStreamingSingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
 {
+public abstract class AbstractRecordSingleTableInventoryCalculator<S, C> 
extends AbstractStreamingSingleTableInventoryCalculator<S> {
     
     private static final int DEFAULT_STREAMING_CHUNK_COUNT = 100;
     
@@ -63,24 +62,24 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
     
     private final StreamingRangeType streamingRangeType;
     
-    public RecordSingleTableInventoryCalculator(final int chunkSize, final 
StreamingRangeType streamingRangeType) {
+    public AbstractRecordSingleTableInventoryCalculator(final int chunkSize, 
final StreamingRangeType streamingRangeType) {
         this(chunkSize, DEFAULT_STREAMING_CHUNK_COUNT, streamingRangeType);
     }
     
     @Override
-    public Optional<SingleTableInventoryCalculatedResult> calculateChunk(final 
SingleTableInventoryCalculateParameter param) {
-        List<Map<String, Object>> records = calculateChunk0(param);
+    public Optional<S> calculateChunk(final 
SingleTableInventoryCalculateParameter param) {
+        List<C> records = calculateChunk0(param);
         if (records.isEmpty()) {
             return Optional.empty();
         }
-        String firstUniqueKey = param.getFirstUniqueKey().getName();
+        Object maxUniqueKeyValue = 
getFirstUniqueKeyValue(records.get(records.size() - 1), 
param.getFirstUniqueKey().getName());
         if (QueryType.RANGE_QUERY == param.getQueryType()) {
-            updateQueryRangeLower(param, records, firstUniqueKey);
+            param.setQueryRange(new QueryRange(maxUniqueKeyValue, false, 
param.getQueryRange().getUpper()));
         }
-        return convertRecordsToResult(records, firstUniqueKey);
+        return Optional.of(convertRecordsToResult(records, maxUniqueKeyValue));
     }
     
-    private List<Map<String, Object>> calculateChunk0(final 
SingleTableInventoryCalculateParameter param) {
+    private List<C> calculateChunk0(final 
SingleTableInventoryCalculateParameter param) {
         InventoryColumnValueReaderEngine columnValueReaderEngine = new 
InventoryColumnValueReaderEngine(param.getDatabaseType());
         try {
             if (QueryType.POINT_QUERY == param.getQueryType()) {
@@ -102,30 +101,29 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         }
     }
     
-    private List<Map<String, Object>> pointQuery(final 
SingleTableInventoryCalculateParameter param, final 
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
-        List<Map<String, Object>> result = new LinkedList<>();
-        CalculationContext calculationContext = 
prepareCalculationContext(param);
+    private List<C> pointQuery(final SingleTableInventoryCalculateParameter 
param, final InventoryColumnValueReaderEngine columnValueReaderEngine) throws 
SQLException {
+        List<C> result = new LinkedList<>();
+        CalculationContext<C> calculationContext = 
prepareCalculationContext(param);
         prepareDatabaseResources(calculationContext, param);
         ResultSet resultSet = calculationContext.getResultSet();
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
         while (resultSet.next()) {
             ShardingSpherePreconditions.checkState(!isCanceling(), () -> new 
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s", 
param.getTable()));
-            Map<String, Object> record = readRecord(columnValueReaderEngine, 
resultSet, resultSetMetaData);
+            C record = readRecord(resultSet, resultSetMetaData, 
columnValueReaderEngine);
             result.add(record);
         }
         return result;
     }
     
-    private List<Map<String, Object>> allQuery(final 
SingleTableInventoryCalculateParameter param,
-                                               final 
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
-        List<Map<String, Object>> result = new LinkedList<>();
-        CalculationContext calculationContext = 
prepareCalculationContext(param);
+    private List<C> allQuery(final SingleTableInventoryCalculateParameter 
param, final InventoryColumnValueReaderEngine columnValueReaderEngine) throws 
SQLException {
+        List<C> result = new LinkedList<>();
+        CalculationContext<C> calculationContext = 
prepareCalculationContext(param);
         prepareDatabaseResources(calculationContext, param);
         ResultSet resultSet = calculationContext.getResultSet();
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
         while (resultSet.next()) {
             ShardingSpherePreconditions.checkState(!isCanceling(), () -> new 
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s", 
param.getTable()));
-            result.add(readRecord(columnValueReaderEngine, resultSet, 
resultSetMetaData));
+            result.add(readRecord(resultSet, resultSetMetaData, 
columnValueReaderEngine));
             if (result.size() == chunkSize) {
                 break;
             }
@@ -136,16 +134,16 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         return result;
     }
     
-    private List<Map<String, Object>> rangeQueryWithSingleColumUniqueKey(final 
SingleTableInventoryCalculateParameter param,
-                                                                         final 
InventoryColumnValueReaderEngine columnValueReaderEngine, final int round) 
throws SQLException {
-        List<Map<String, Object>> result = new LinkedList<>();
-        CalculationContext calculationContext = 
prepareCalculationContext(param);
+    private List<C> rangeQueryWithSingleColumUniqueKey(final 
SingleTableInventoryCalculateParameter param,
+                                                       final 
InventoryColumnValueReaderEngine columnValueReaderEngine, final int round) 
throws SQLException {
+        List<C> result = new LinkedList<>();
+        CalculationContext<C> calculationContext = 
prepareCalculationContext(param);
         prepareDatabaseResources(calculationContext, param);
         ResultSet resultSet = calculationContext.getResultSet();
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
         while (resultSet.next()) {
             ShardingSpherePreconditions.checkState(!isCanceling(), () -> new 
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s", 
param.getTable()));
-            result.add(readRecord(columnValueReaderEngine, resultSet, 
resultSetMetaData));
+            result.add(readRecord(resultSet, resultSetMetaData, 
columnValueReaderEngine));
             if (result.size() == chunkSize) {
                 return result;
             }
@@ -157,9 +155,9 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         return result;
     }
     
-    private List<Map<String, Object>> rangeQueryWithMultiColumUniqueKeys(final 
SingleTableInventoryCalculateParameter param,
-                                                                         final 
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
-        CalculationContext calculationContext = 
prepareCalculationContext(param);
+    private List<C> rangeQueryWithMultiColumUniqueKeys(final 
SingleTableInventoryCalculateParameter param,
+                                                       final 
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
+        CalculationContext<C> calculationContext = 
prepareCalculationContext(param);
         if (calculationContext.getRecordDeque().size() > chunkSize) {
             return queryFromBuffer(calculationContext.getRecordDeque());
         }
@@ -167,21 +165,23 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         return queryFromBuffer(calculationContext.getRecordDeque());
     }
     
-    private void doRangeQueryWithMultiColumUniqueKeys(final 
SingleTableInventoryCalculateParameter param, final CalculationContext 
calculationContext,
+    private void doRangeQueryWithMultiColumUniqueKeys(final 
SingleTableInventoryCalculateParameter param, final CalculationContext<C> 
calculationContext,
                                                       final 
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
         prepareDatabaseResources(calculationContext, param);
         ResultSet resultSet = calculationContext.getResultSet();
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-        Map<String, Object> previousRecord = 
calculationContext.getRecordDeque().pollLast();
-        List<Map<String, Object>> duplicateRecords = new LinkedList<>();
+        C previousRecord = calculationContext.getRecordDeque().pollLast();
+        List<C> duplicateRecords = new LinkedList<>();
         if (null != previousRecord) {
             duplicateRecords.add(previousRecord);
         }
         EqualsBuilder equalsBuilder = new EqualsBuilder();
+        String firstUniqueKey = param.getFirstUniqueKey().getName();
         while (resultSet.next()) {
             ShardingSpherePreconditions.checkState(!isCanceling(), () -> new 
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s", 
param.getTable()));
-            Map<String, Object> record = readRecord(columnValueReaderEngine, 
resultSet, resultSetMetaData);
-            if (null == previousRecord || 
DataConsistencyCheckUtils.isFirstUniqueKeyValueMatched(previousRecord, record, 
param.getFirstUniqueKey().getName(), equalsBuilder)) {
+            C record = readRecord(resultSet, resultSetMetaData, 
columnValueReaderEngine);
+            if (null == previousRecord || 
DataConsistencyCheckUtils.isMatched(equalsBuilder,
+                    getFirstUniqueKeyValue(previousRecord, firstUniqueKey), 
getFirstUniqueKeyValue(record, firstUniqueKey))) {
                 duplicateRecords.add(record);
                 previousRecord = record;
                 continue;
@@ -203,9 +203,9 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         }
     }
     
-    private List<Map<String, Object>> pointRangeQuery(final 
SingleTableInventoryCalculateParameter param, final Map<String, Object> 
duplicateRecord,
-                                                      final 
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
-        Object duplicateUniqueKeyValue = 
DataConsistencyCheckUtils.getFirstUniqueKeyValue(duplicateRecord, 
param.getFirstUniqueKey().getName());
+    private List<C> pointRangeQuery(final 
SingleTableInventoryCalculateParameter param, final C duplicateRecord,
+                                    final InventoryColumnValueReaderEngine 
columnValueReaderEngine) throws SQLException {
+        Object duplicateUniqueKeyValue = 
getFirstUniqueKeyValue(duplicateRecord, param.getFirstUniqueKey().getName());
         SingleTableInventoryCalculateParameter newParam = 
buildPointRangeQueryCalculateParameter(param, duplicateUniqueKeyValue);
         try {
             return pointQuery(newParam, columnValueReaderEngine);
@@ -214,10 +214,10 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         }
     }
     
-    private List<Map<String, Object>> queryFromBuffer(final Deque<Map<String, 
Object>> recordDeque) {
-        List<Map<String, Object>> result = new LinkedList<>();
+    private List<C> queryFromBuffer(final Deque<C> recordDeque) {
+        List<C> result = new LinkedList<>();
         while (true) {
-            Map<String, Object> record = recordDeque.pollFirst();
+            C record = recordDeque.pollFirst();
             if (null == record) {
                 break;
             }
@@ -229,17 +229,18 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         return result;
     }
     
-    private CalculationContext prepareCalculationContext(final 
SingleTableInventoryCalculateParameter param) {
-        CalculationContext result = (CalculationContext) 
param.getCalculationContext();
+    @SuppressWarnings("unchecked")
+    private CalculationContext<C> prepareCalculationContext(final 
SingleTableInventoryCalculateParameter param) {
+        CalculationContext<C> result = (CalculationContext<C>) 
param.getCalculationContext();
         if (null != result) {
             return result;
         }
-        result = new CalculationContext();
+        result = new CalculationContext<>();
         param.setCalculationContext(result);
         return result;
     }
     
-    private void prepareDatabaseResources(final CalculationContext 
calculationContext, final SingleTableInventoryCalculateParameter param) throws 
SQLException {
+    private void prepareDatabaseResources(final CalculationContext<C> 
calculationContext, final SingleTableInventoryCalculateParameter param) throws 
SQLException {
         if (calculationContext.isDatabaseResourcesReady()) {
             return;
         }
@@ -317,21 +318,9 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         return result;
     }
     
-    private Map<String, Object> readRecord(final 
InventoryColumnValueReaderEngine columnValueReaderEngine, final ResultSet 
resultSet, final ResultSetMetaData resultSetMetaData) throws SQLException {
-        Map<String, Object> result = new LinkedHashMap<>();
-        for (int columnIndex = 1, columnCount = 
resultSetMetaData.getColumnCount(); columnIndex <= columnCount; columnIndex++) {
-            result.put(resultSetMetaData.getColumnLabel(columnIndex), 
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex));
-        }
-        return result;
-    }
+    protected abstract C readRecord(ResultSet resultSet, ResultSetMetaData 
resultSetMetaData, InventoryColumnValueReaderEngine columnValueReaderEngine) 
throws SQLException;
     
-    private void updateQueryRangeLower(final 
SingleTableInventoryCalculateParameter param, final List<Map<String, Object>> 
records, final String firstUniqueKey) {
-        Object maxUniqueKeyValue = 
DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(records.size() - 
1), firstUniqueKey);
-        param.setQueryRange(new QueryRange(maxUniqueKeyValue, false, 
param.getQueryRange().getUpper()));
-    }
+    protected abstract Object getFirstUniqueKeyValue(C record, String 
firstUniqueKey);
     
-    private Optional<SingleTableInventoryCalculatedResult> 
convertRecordsToResult(final List<Map<String, Object>> records, final String 
firstUniqueKey) {
-        Object maxUniqueKeyValue = 
DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(records.size() - 
1), firstUniqueKey);
-        return Optional.of(new 
RecordSingleTableInventoryCalculatedResult(maxUniqueKeyValue, records));
-    }
+    protected abstract S convertRecordsToResult(List<C> records, Object 
maxUniqueKeyValue);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
index 927d3ba490f..3d87c18f67f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
@@ -25,14 +25,15 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.util.Deque;
 import java.util.LinkedList;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Calculation context.
+ *
+ * @param <C> the type of record
  */
-public final class CalculationContext implements AutoCloseable {
+public final class CalculationContext<C> implements AutoCloseable {
     
     private final AtomicReference<Connection> connection = new 
AtomicReference<>();
     
@@ -43,7 +44,7 @@ public final class CalculationContext implements 
AutoCloseable {
     private final AtomicBoolean databaseResourcesReady = new 
AtomicBoolean(false);
     
     @Getter
-    private final Deque<Map<String, Object>> recordDeque = new LinkedList<>();
+    private final Deque<C> recordDeque = new LinkedList<>();
     
     /**
      * Get connection.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculator.java
new file mode 100644
index 00000000000..aa8fbb4e7b1
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
+
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCheckUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Record single table inventory check calculator.
+ */
+public final class RecordSingleTableInventoryCheckCalculator extends 
AbstractRecordSingleTableInventoryCalculator<SingleTableInventoryCalculatedResult,
 Map<String, Object>> {
+    
+    public RecordSingleTableInventoryCheckCalculator(final int chunkSize, 
final int streamingChunkCount, final StreamingRangeType streamingRangeType) {
+        super(chunkSize, streamingChunkCount, streamingRangeType);
+    }
+    
+    public RecordSingleTableInventoryCheckCalculator(final int chunkSize, 
final StreamingRangeType streamingRangeType) {
+        super(chunkSize, streamingRangeType);
+    }
+    
+    @Override
+    protected Map<String, Object> readRecord(final ResultSet resultSet, final 
ResultSetMetaData resultSetMetaData, final InventoryColumnValueReaderEngine 
columnValueReaderEngine) throws SQLException {
+        Map<String, Object> result = new LinkedHashMap<>();
+        for (int columnIndex = 1, columnCount = 
resultSetMetaData.getColumnCount(); columnIndex <= columnCount; columnIndex++) {
+            result.put(resultSetMetaData.getColumnLabel(columnIndex), 
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex));
+        }
+        return result;
+    }
+    
+    @Override
+    protected Object getFirstUniqueKeyValue(final Map<String, Object> record, 
final String firstUniqueKey) {
+        return DataConsistencyCheckUtils.getFirstUniqueKeyValue(record, 
firstUniqueKey);
+    }
+    
+    @Override
+    protected SingleTableInventoryCalculatedResult 
convertRecordsToResult(final List<Map<String, Object>> records, final Object 
maxUniqueKeyValue) {
+        return new 
RecordSingleTableInventoryCalculatedResult(maxUniqueKeyValue, records);
+    }
+}
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculatorTest.java
similarity index 90%
rename from 
test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
rename to 
test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculatorTest.java
index 2b397612e4d..beb5247b4dd 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculatorTest.java
@@ -50,7 +50,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-class RecordSingleTableInventoryCalculatorTest {
+class RecordSingleTableInventoryCheckCalculatorTest {
     
     private static PipelineDataSource dataSource;
     
@@ -105,7 +105,7 @@ class RecordSingleTableInventoryCalculatorTest {
     @ParameterizedTest
     @CsvSource({"SMALL", "LARGE"})
     void assertCalculateOfRangeQueryFromBeginWithOrderIdUniqueKey(final String 
streamingRangeType) {
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(4, 
StreamingRangeType.valueOf(streamingRangeType));
+        RecordSingleTableInventoryCheckCalculator calculator = new 
RecordSingleTableInventoryCheckCalculator(4, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
                 Collections.emptyList(), buildOrderIdUniqueKey(), 
QueryType.RANGE_QUERY, null);
         assertQueryRangeCalculatedResult(calculator, param, new QueryRange(0, 
false, null), 4, 4);
@@ -114,7 +114,7 @@ class RecordSingleTableInventoryCalculatorTest {
     @ParameterizedTest
     @CsvSource({"SMALL", "LARGE"})
     void assertCalculateOfRangeQueryFromMiddleWithOrderIdUniqueKey(final 
String streamingRangeType) {
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(4, 
StreamingRangeType.valueOf(streamingRangeType));
+        RecordSingleTableInventoryCheckCalculator calculator = new 
RecordSingleTableInventoryCheckCalculator(4, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
                 Collections.emptyList(), buildOrderIdUniqueKey(), 
QueryType.RANGE_QUERY, null);
         assertQueryRangeCalculatedResult(calculator, param, new QueryRange(4, 
false, null), 4, 8);
@@ -123,14 +123,14 @@ class RecordSingleTableInventoryCalculatorTest {
     @ParameterizedTest
     @CsvSource({"SMALL", "LARGE"})
     void assertCalculateOfRangeQueryWithMultiColumnUniqueKeys(final String 
streamingRangeType) {
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(1000, 
StreamingRangeType.valueOf(streamingRangeType));
+        RecordSingleTableInventoryCheckCalculator calculator = new 
RecordSingleTableInventoryCheckCalculator(1000, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
                 Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3, 
true, 6), 8, 6);
         assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3, 
false, 6), 3, 6);
     }
     
-    private void assertQueryRangeCalculatedResult(final 
RecordSingleTableInventoryCalculator calculator, final 
SingleTableInventoryCalculateParameter param, final QueryRange queryRange,
+    private void assertQueryRangeCalculatedResult(final 
RecordSingleTableInventoryCheckCalculator calculator, final 
SingleTableInventoryCalculateParameter param, final QueryRange queryRange,
                                                   final int 
expectedRecordsCount, final int expectedMaxUniqueKeyValue) {
         param.setQueryRange(queryRange);
         Optional<SingleTableInventoryCalculatedResult> calculatedResult = 
calculator.calculateChunk(param);
@@ -151,7 +151,7 @@ class RecordSingleTableInventoryCalculatorTest {
             connection.createStatement().execute(
                     "INSERT INTO test3 (user_id,order_id,status) VALUES 
(3,1,'ok'),(3,2,'ok'),(4,3,'ok'),(4,4,'ok'),(5,5,'ok'),(5,6,'ok'),(6,7,'ok'),(6,8,'ok'),(7,9,'ok')");
         }
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(1000, 
StreamingRangeType.valueOf(streamingRangeType));
+        RecordSingleTableInventoryCheckCalculator calculator = new 
RecordSingleTableInventoryCheckCalculator(1000, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"test3"),
                 Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3, 
true, 4), 4, 4);
@@ -162,7 +162,7 @@ class RecordSingleTableInventoryCalculatorTest {
     @ParameterizedTest
     @CsvSource({"SMALL", "LARGE"})
     void assertCalculateOfReservedRangeQueryWithMultiColumnUniqueKeys(final 
String streamingRangeType) {
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(1000, 
StreamingRangeType.valueOf(streamingRangeType));
+        RecordSingleTableInventoryCheckCalculator calculator = new 
RecordSingleTableInventoryCheckCalculator(1000, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
                 Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(3, true, 2));
@@ -181,7 +181,7 @@ class RecordSingleTableInventoryCalculatorTest {
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"test1"),
                 Collections.emptyList(), buildUserIdUniqueKey(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(0, false, null));
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(5, 
StreamingRangeType.valueOf(streamingRangeType));
+        RecordSingleTableInventoryCheckCalculator calculator = new 
RecordSingleTableInventoryCheckCalculator(5, 
StreamingRangeType.valueOf(streamingRangeType));
         Optional<SingleTableInventoryCalculatedResult> calculateResult = 
calculator.calculateChunk(param);
         QuietlyCloser.close(param.getCalculationContext());
         assertFalse(calculateResult.isPresent());
@@ -197,7 +197,7 @@ class RecordSingleTableInventoryCalculatorTest {
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"test2"),
                 Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(null, false, null));
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(5, 
StreamingRangeType.valueOf(streamingRangeType));
+        RecordSingleTableInventoryCheckCalculator calculator = new 
RecordSingleTableInventoryCheckCalculator(5, 
StreamingRangeType.valueOf(streamingRangeType));
         Optional<SingleTableInventoryCalculatedResult> calculateResult = 
calculator.calculateChunk(param);
         QuietlyCloser.close(param.getCalculationContext());
         assertFalse(calculateResult.isPresent());
@@ -206,7 +206,7 @@ class RecordSingleTableInventoryCalculatorTest {
     @ParameterizedTest
     @CsvSource({"100,SMALL", "2,SMALL", "3,SMALL", "100,LARGE", "2,LARGE", 
"3,LARGE"})
     void assertCalculateOfRangeQueryAllWithOrderIdUniqueKeyWith3x(final int 
streamingChunkCount, final String streamingRangeType) {
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(3, streamingChunkCount, 
StreamingRangeType.valueOf(streamingRangeType));
+        RecordSingleTableInventoryCheckCalculator calculator = new 
RecordSingleTableInventoryCheckCalculator(3, streamingChunkCount, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
                 Collections.emptyList(), buildOrderIdUniqueKey(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(null, false, null));
@@ -244,7 +244,7 @@ class RecordSingleTableInventoryCalculatorTest {
     @ParameterizedTest
     @CsvSource({"SMALL", "LARGE"})
     void 
assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith50x100(final String 
streamingRangeType) {
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(50, 100, 
StreamingRangeType.valueOf(streamingRangeType));
+        RecordSingleTableInventoryCheckCalculator calculator = new 
RecordSingleTableInventoryCheckCalculator(50, 100, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
                 Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(null, false, null));
@@ -270,7 +270,7 @@ class RecordSingleTableInventoryCalculatorTest {
     @ParameterizedTest
     @CsvSource({"100,SMALL", "2,SMALL", "3,SMALL", "100,LARGE", "2,LARGE", 
"3,LARGE"})
     void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith3x(final 
int streamingChunkCount, final String streamingRangeType) {
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(3, streamingChunkCount, 
StreamingRangeType.valueOf(streamingRangeType));
+        RecordSingleTableInventoryCheckCalculator calculator = new 
RecordSingleTableInventoryCheckCalculator(3, streamingChunkCount, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
                 Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(null, false, null));
@@ -308,7 +308,7 @@ class RecordSingleTableInventoryCalculatorTest {
     @ParameterizedTest
     @CsvSource({"100,SMALL", "3,SMALL", "4,SMALL", "100,LARGE", "3,LARGE", 
"4,LARGE"})
     void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith2x(final 
int streamingChunkCount, final String streamingRangeType) {
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(2, streamingChunkCount, 
StreamingRangeType.valueOf(streamingRangeType));
+        RecordSingleTableInventoryCheckCalculator calculator = new 
RecordSingleTableInventoryCheckCalculator(2, streamingChunkCount, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
                 Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(null, false, null));
@@ -355,7 +355,7 @@ class RecordSingleTableInventoryCalculatorTest {
     @ParameterizedTest
     @CsvSource({"SMALL", "LARGE"})
     void assertCalculateOfPointQuery(final String streamingRangeType) {
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(3, 
StreamingRangeType.valueOf(streamingRangeType));
+        RecordSingleTableInventoryCheckCalculator calculator = new 
RecordSingleTableInventoryCheckCalculator(3, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
                 Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.POINT_QUERY, null);
         param.setUniqueKeysValues(Arrays.asList(3, 3));
@@ -371,7 +371,7 @@ class RecordSingleTableInventoryCalculatorTest {
     @ParameterizedTest
     @CsvSource({"SMALL", "LARGE"})
     void assertCalculateOfPointRangeQuery(final String streamingRangeType) {
-        RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(3, 
StreamingRangeType.valueOf(streamingRangeType));
+        RecordSingleTableInventoryCheckCalculator calculator = new 
RecordSingleTableInventoryCheckCalculator(3, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
                 Collections.emptyList(), buildUserIdUniqueKey(), 
QueryType.POINT_QUERY, null);
         param.setUniqueKeysValues(Collections.singleton(3));


Reply via email to