This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fe989e2981e Extract AbstractRecordSingleTableInventoryCalculator
(#36746)
fe989e2981e is described below
commit fe989e2981ec209fea307a12c5e275edd0214418
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Sep 29 11:43:10 2025 +0800
Extract AbstractRecordSingleTableInventoryCalculator (#36746)
---
.../DataConsistencyCheckUtils.java | 6 +-
.../DataMatchTableDataConsistencyChecker.java | 4 +-
...tractRecordSingleTableInventoryCalculator.java} | 109 +++++++++------------
.../table/calculator/CalculationContext.java | 7 +-
.../RecordSingleTableInventoryCheckCalculator.java | 63 ++++++++++++
...rdSingleTableInventoryCheckCalculatorTest.java} | 30 +++---
6 files changed, 136 insertions(+), 83 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
index afa37863d18..608dc579001 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
@@ -205,11 +205,11 @@ public final class DataConsistencyCheckUtils {
/**
* Get first unique key value.
*
- * @param rawRecord raw record
+ * @param record record
* @param uniqueKey unique key
* @return first unique key value
*/
- public static Object getFirstUniqueKeyValue(final Map<String, Object>
rawRecord, final @Nullable String uniqueKey) {
- return rawRecord.isEmpty() || null == uniqueKey ? null :
rawRecord.get(uniqueKey);
+ public static Object getFirstUniqueKeyValue(final Map<String, Object>
record, final @Nullable String uniqueKey) {
+ return record.isEmpty() || null == uniqueKey ? null :
record.get(uniqueKey);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
index 48016bcedb2..f4c0ff4898e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
@@ -21,7 +21,7 @@ import com.google.common.base.Strings;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckIgnoredType;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordSingleTableInventoryCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordSingleTableInventoryCheckCalculator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculator;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.StreamingRangeType;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
@@ -129,7 +129,7 @@ public final class DataMatchTableDataConsistencyChecker
implements TableDataCons
@Override
protected
SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
buildSingleTableInventoryCalculator() {
- return new RecordSingleTableInventoryCalculator(chunkSize,
streamingRangeType);
+ return new RecordSingleTableInventoryCheckCalculator(chunkSize,
streamingRangeType);
}
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
similarity index 71%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
index 06f19c256fd..e369662f172 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
@@ -20,8 +20,6 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.builder.EqualsBuilder;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCheckUtils;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
@@ -42,18 +40,19 @@ import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
/**
- * Record single table inventory calculator.
+ * Abstract record single table inventory calculator.
+ *
+ * @param <S> the type of result
+ * @param <C> the type of record
*/
@HighFrequencyInvocation
@RequiredArgsConstructor
-public final class RecordSingleTableInventoryCalculator extends
AbstractStreamingSingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
{
+public abstract class AbstractRecordSingleTableInventoryCalculator<S, C>
extends AbstractStreamingSingleTableInventoryCalculator<S> {
private static final int DEFAULT_STREAMING_CHUNK_COUNT = 100;
@@ -63,24 +62,24 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
private final StreamingRangeType streamingRangeType;
- public RecordSingleTableInventoryCalculator(final int chunkSize, final
StreamingRangeType streamingRangeType) {
+ public AbstractRecordSingleTableInventoryCalculator(final int chunkSize,
final StreamingRangeType streamingRangeType) {
this(chunkSize, DEFAULT_STREAMING_CHUNK_COUNT, streamingRangeType);
}
@Override
- public Optional<SingleTableInventoryCalculatedResult> calculateChunk(final
SingleTableInventoryCalculateParameter param) {
- List<Map<String, Object>> records = calculateChunk0(param);
+ public Optional<S> calculateChunk(final
SingleTableInventoryCalculateParameter param) {
+ List<C> records = calculateChunk0(param);
if (records.isEmpty()) {
return Optional.empty();
}
- String firstUniqueKey = param.getFirstUniqueKey().getName();
+ Object maxUniqueKeyValue =
getFirstUniqueKeyValue(records.get(records.size() - 1),
param.getFirstUniqueKey().getName());
if (QueryType.RANGE_QUERY == param.getQueryType()) {
- updateQueryRangeLower(param, records, firstUniqueKey);
+ param.setQueryRange(new QueryRange(maxUniqueKeyValue, false,
param.getQueryRange().getUpper()));
}
- return convertRecordsToResult(records, firstUniqueKey);
+ return Optional.of(convertRecordsToResult(records, maxUniqueKeyValue));
}
- private List<Map<String, Object>> calculateChunk0(final
SingleTableInventoryCalculateParameter param) {
+ private List<C> calculateChunk0(final
SingleTableInventoryCalculateParameter param) {
InventoryColumnValueReaderEngine columnValueReaderEngine = new
InventoryColumnValueReaderEngine(param.getDatabaseType());
try {
if (QueryType.POINT_QUERY == param.getQueryType()) {
@@ -102,30 +101,29 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
}
}
- private List<Map<String, Object>> pointQuery(final
SingleTableInventoryCalculateParameter param, final
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
- List<Map<String, Object>> result = new LinkedList<>();
- CalculationContext calculationContext =
prepareCalculationContext(param);
+ private List<C> pointQuery(final SingleTableInventoryCalculateParameter
param, final InventoryColumnValueReaderEngine columnValueReaderEngine) throws
SQLException {
+ List<C> result = new LinkedList<>();
+ CalculationContext<C> calculationContext =
prepareCalculationContext(param);
prepareDatabaseResources(calculationContext, param);
ResultSet resultSet = calculationContext.getResultSet();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
while (resultSet.next()) {
ShardingSpherePreconditions.checkState(!isCanceling(), () -> new
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s",
param.getTable()));
- Map<String, Object> record = readRecord(columnValueReaderEngine,
resultSet, resultSetMetaData);
+ C record = readRecord(resultSet, resultSetMetaData,
columnValueReaderEngine);
result.add(record);
}
return result;
}
- private List<Map<String, Object>> allQuery(final
SingleTableInventoryCalculateParameter param,
- final
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
- List<Map<String, Object>> result = new LinkedList<>();
- CalculationContext calculationContext =
prepareCalculationContext(param);
+ private List<C> allQuery(final SingleTableInventoryCalculateParameter
param, final InventoryColumnValueReaderEngine columnValueReaderEngine) throws
SQLException {
+ List<C> result = new LinkedList<>();
+ CalculationContext<C> calculationContext =
prepareCalculationContext(param);
prepareDatabaseResources(calculationContext, param);
ResultSet resultSet = calculationContext.getResultSet();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
while (resultSet.next()) {
ShardingSpherePreconditions.checkState(!isCanceling(), () -> new
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s",
param.getTable()));
- result.add(readRecord(columnValueReaderEngine, resultSet,
resultSetMetaData));
+ result.add(readRecord(resultSet, resultSetMetaData,
columnValueReaderEngine));
if (result.size() == chunkSize) {
break;
}
@@ -136,16 +134,16 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
return result;
}
- private List<Map<String, Object>> rangeQueryWithSingleColumUniqueKey(final
SingleTableInventoryCalculateParameter param,
- final
InventoryColumnValueReaderEngine columnValueReaderEngine, final int round)
throws SQLException {
- List<Map<String, Object>> result = new LinkedList<>();
- CalculationContext calculationContext =
prepareCalculationContext(param);
+ private List<C> rangeQueryWithSingleColumUniqueKey(final
SingleTableInventoryCalculateParameter param,
+ final
InventoryColumnValueReaderEngine columnValueReaderEngine, final int round)
throws SQLException {
+ List<C> result = new LinkedList<>();
+ CalculationContext<C> calculationContext =
prepareCalculationContext(param);
prepareDatabaseResources(calculationContext, param);
ResultSet resultSet = calculationContext.getResultSet();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
while (resultSet.next()) {
ShardingSpherePreconditions.checkState(!isCanceling(), () -> new
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s",
param.getTable()));
- result.add(readRecord(columnValueReaderEngine, resultSet,
resultSetMetaData));
+ result.add(readRecord(resultSet, resultSetMetaData,
columnValueReaderEngine));
if (result.size() == chunkSize) {
return result;
}
@@ -157,9 +155,9 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
return result;
}
- private List<Map<String, Object>> rangeQueryWithMultiColumUniqueKeys(final
SingleTableInventoryCalculateParameter param,
- final
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
- CalculationContext calculationContext =
prepareCalculationContext(param);
+ private List<C> rangeQueryWithMultiColumUniqueKeys(final
SingleTableInventoryCalculateParameter param,
+ final
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
+ CalculationContext<C> calculationContext =
prepareCalculationContext(param);
if (calculationContext.getRecordDeque().size() > chunkSize) {
return queryFromBuffer(calculationContext.getRecordDeque());
}
@@ -167,21 +165,23 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
return queryFromBuffer(calculationContext.getRecordDeque());
}
- private void doRangeQueryWithMultiColumUniqueKeys(final
SingleTableInventoryCalculateParameter param, final CalculationContext
calculationContext,
+ private void doRangeQueryWithMultiColumUniqueKeys(final
SingleTableInventoryCalculateParameter param, final CalculationContext<C>
calculationContext,
final
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
prepareDatabaseResources(calculationContext, param);
ResultSet resultSet = calculationContext.getResultSet();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- Map<String, Object> previousRecord =
calculationContext.getRecordDeque().pollLast();
- List<Map<String, Object>> duplicateRecords = new LinkedList<>();
+ C previousRecord = calculationContext.getRecordDeque().pollLast();
+ List<C> duplicateRecords = new LinkedList<>();
if (null != previousRecord) {
duplicateRecords.add(previousRecord);
}
EqualsBuilder equalsBuilder = new EqualsBuilder();
+ String firstUniqueKey = param.getFirstUniqueKey().getName();
while (resultSet.next()) {
ShardingSpherePreconditions.checkState(!isCanceling(), () -> new
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s",
param.getTable()));
- Map<String, Object> record = readRecord(columnValueReaderEngine,
resultSet, resultSetMetaData);
- if (null == previousRecord ||
DataConsistencyCheckUtils.isFirstUniqueKeyValueMatched(previousRecord, record,
param.getFirstUniqueKey().getName(), equalsBuilder)) {
+ C record = readRecord(resultSet, resultSetMetaData,
columnValueReaderEngine);
+ if (null == previousRecord ||
DataConsistencyCheckUtils.isMatched(equalsBuilder,
+ getFirstUniqueKeyValue(previousRecord, firstUniqueKey),
getFirstUniqueKeyValue(record, firstUniqueKey))) {
duplicateRecords.add(record);
previousRecord = record;
continue;
@@ -203,9 +203,9 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
}
}
- private List<Map<String, Object>> pointRangeQuery(final
SingleTableInventoryCalculateParameter param, final Map<String, Object>
duplicateRecord,
- final
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
- Object duplicateUniqueKeyValue =
DataConsistencyCheckUtils.getFirstUniqueKeyValue(duplicateRecord,
param.getFirstUniqueKey().getName());
+ private List<C> pointRangeQuery(final
SingleTableInventoryCalculateParameter param, final C duplicateRecord,
+ final InventoryColumnValueReaderEngine
columnValueReaderEngine) throws SQLException {
+ Object duplicateUniqueKeyValue =
getFirstUniqueKeyValue(duplicateRecord, param.getFirstUniqueKey().getName());
SingleTableInventoryCalculateParameter newParam =
buildPointRangeQueryCalculateParameter(param, duplicateUniqueKeyValue);
try {
return pointQuery(newParam, columnValueReaderEngine);
@@ -214,10 +214,10 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
}
}
- private List<Map<String, Object>> queryFromBuffer(final Deque<Map<String,
Object>> recordDeque) {
- List<Map<String, Object>> result = new LinkedList<>();
+ private List<C> queryFromBuffer(final Deque<C> recordDeque) {
+ List<C> result = new LinkedList<>();
while (true) {
- Map<String, Object> record = recordDeque.pollFirst();
+ C record = recordDeque.pollFirst();
if (null == record) {
break;
}
@@ -229,17 +229,18 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
return result;
}
- private CalculationContext prepareCalculationContext(final
SingleTableInventoryCalculateParameter param) {
- CalculationContext result = (CalculationContext)
param.getCalculationContext();
+ @SuppressWarnings("unchecked")
+ private CalculationContext<C> prepareCalculationContext(final
SingleTableInventoryCalculateParameter param) {
+ CalculationContext<C> result = (CalculationContext<C>)
param.getCalculationContext();
if (null != result) {
return result;
}
- result = new CalculationContext();
+ result = new CalculationContext<>();
param.setCalculationContext(result);
return result;
}
- private void prepareDatabaseResources(final CalculationContext
calculationContext, final SingleTableInventoryCalculateParameter param) throws
SQLException {
+ private void prepareDatabaseResources(final CalculationContext<C>
calculationContext, final SingleTableInventoryCalculateParameter param) throws
SQLException {
if (calculationContext.isDatabaseResourcesReady()) {
return;
}
@@ -317,21 +318,9 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
return result;
}
- private Map<String, Object> readRecord(final
InventoryColumnValueReaderEngine columnValueReaderEngine, final ResultSet
resultSet, final ResultSetMetaData resultSetMetaData) throws SQLException {
- Map<String, Object> result = new LinkedHashMap<>();
- for (int columnIndex = 1, columnCount =
resultSetMetaData.getColumnCount(); columnIndex <= columnCount; columnIndex++) {
- result.put(resultSetMetaData.getColumnLabel(columnIndex),
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex));
- }
- return result;
- }
+ protected abstract C readRecord(ResultSet resultSet, ResultSetMetaData
resultSetMetaData, InventoryColumnValueReaderEngine columnValueReaderEngine)
throws SQLException;
- private void updateQueryRangeLower(final
SingleTableInventoryCalculateParameter param, final List<Map<String, Object>>
records, final String firstUniqueKey) {
- Object maxUniqueKeyValue =
DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(records.size() -
1), firstUniqueKey);
- param.setQueryRange(new QueryRange(maxUniqueKeyValue, false,
param.getQueryRange().getUpper()));
- }
+ protected abstract Object getFirstUniqueKeyValue(C record, String
firstUniqueKey);
- private Optional<SingleTableInventoryCalculatedResult>
convertRecordsToResult(final List<Map<String, Object>> records, final String
firstUniqueKey) {
- Object maxUniqueKeyValue =
DataConsistencyCheckUtils.getFirstUniqueKeyValue(records.get(records.size() -
1), firstUniqueKey);
- return Optional.of(new
RecordSingleTableInventoryCalculatedResult(maxUniqueKeyValue, records));
- }
+ protected abstract S convertRecordsToResult(List<C> records, Object
maxUniqueKeyValue);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
index 927d3ba490f..3d87c18f67f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
@@ -25,14 +25,15 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Deque;
import java.util.LinkedList;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* Calculation context.
+ *
+ * @param <C> the type of record
*/
-public final class CalculationContext implements AutoCloseable {
+public final class CalculationContext<C> implements AutoCloseable {
private final AtomicReference<Connection> connection = new
AtomicReference<>();
@@ -43,7 +44,7 @@ public final class CalculationContext implements
AutoCloseable {
private final AtomicBoolean databaseResourcesReady = new
AtomicBoolean(false);
@Getter
- private final Deque<Map<String, Object>> recordDeque = new LinkedList<>();
+ private final Deque<C> recordDeque = new LinkedList<>();
/**
* Get connection.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculator.java
new file mode 100644
index 00000000000..aa8fbb4e7b1
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
+
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCheckUtils;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Record single table inventory check calculator.
+ */
+public final class RecordSingleTableInventoryCheckCalculator extends
AbstractRecordSingleTableInventoryCalculator<SingleTableInventoryCalculatedResult,
Map<String, Object>> {
+
+ public RecordSingleTableInventoryCheckCalculator(final int chunkSize,
final int streamingChunkCount, final StreamingRangeType streamingRangeType) {
+ super(chunkSize, streamingChunkCount, streamingRangeType);
+ }
+
+ public RecordSingleTableInventoryCheckCalculator(final int chunkSize,
final StreamingRangeType streamingRangeType) {
+ super(chunkSize, streamingRangeType);
+ }
+
+ @Override
+ protected Map<String, Object> readRecord(final ResultSet resultSet, final
ResultSetMetaData resultSetMetaData, final InventoryColumnValueReaderEngine
columnValueReaderEngine) throws SQLException {
+ Map<String, Object> result = new LinkedHashMap<>();
+ for (int columnIndex = 1, columnCount =
resultSetMetaData.getColumnCount(); columnIndex <= columnCount; columnIndex++) {
+ result.put(resultSetMetaData.getColumnLabel(columnIndex),
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex));
+ }
+ return result;
+ }
+
+ @Override
+ protected Object getFirstUniqueKeyValue(final Map<String, Object> record,
final String firstUniqueKey) {
+ return DataConsistencyCheckUtils.getFirstUniqueKeyValue(record,
firstUniqueKey);
+ }
+
+ @Override
+ protected SingleTableInventoryCalculatedResult
convertRecordsToResult(final List<Map<String, Object>> records, final Object
maxUniqueKeyValue) {
+ return new
RecordSingleTableInventoryCalculatedResult(maxUniqueKeyValue, records);
+ }
+}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculatorTest.java
similarity index 90%
rename from
test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
rename to
test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculatorTest.java
index 2b397612e4d..beb5247b4dd 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculatorTest.java
@@ -50,7 +50,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-class RecordSingleTableInventoryCalculatorTest {
+class RecordSingleTableInventoryCheckCalculatorTest {
private static PipelineDataSource dataSource;
@@ -105,7 +105,7 @@ class RecordSingleTableInventoryCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void assertCalculateOfRangeQueryFromBeginWithOrderIdUniqueKey(final String
streamingRangeType) {
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(4,
StreamingRangeType.valueOf(streamingRangeType));
+ RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(4,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildOrderIdUniqueKey(),
QueryType.RANGE_QUERY, null);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(0,
false, null), 4, 4);
@@ -114,7 +114,7 @@ class RecordSingleTableInventoryCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void assertCalculateOfRangeQueryFromMiddleWithOrderIdUniqueKey(final
String streamingRangeType) {
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(4,
StreamingRangeType.valueOf(streamingRangeType));
+ RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(4,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildOrderIdUniqueKey(),
QueryType.RANGE_QUERY, null);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(4,
false, null), 4, 8);
@@ -123,14 +123,14 @@ class RecordSingleTableInventoryCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void assertCalculateOfRangeQueryWithMultiColumnUniqueKeys(final String
streamingRangeType) {
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
+ RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3,
true, 6), 8, 6);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3,
false, 6), 3, 6);
}
- private void assertQueryRangeCalculatedResult(final
RecordSingleTableInventoryCalculator calculator, final
SingleTableInventoryCalculateParameter param, final QueryRange queryRange,
+ private void assertQueryRangeCalculatedResult(final
RecordSingleTableInventoryCheckCalculator calculator, final
SingleTableInventoryCalculateParameter param, final QueryRange queryRange,
final int
expectedRecordsCount, final int expectedMaxUniqueKeyValue) {
param.setQueryRange(queryRange);
Optional<SingleTableInventoryCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
@@ -151,7 +151,7 @@ class RecordSingleTableInventoryCalculatorTest {
connection.createStatement().execute(
"INSERT INTO test3 (user_id,order_id,status) VALUES
(3,1,'ok'),(3,2,'ok'),(4,3,'ok'),(4,4,'ok'),(5,5,'ok'),(5,6,'ok'),(6,7,'ok'),(6,8,'ok'),(7,9,'ok')");
}
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
+ RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"test3"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3,
true, 4), 4, 4);
@@ -162,7 +162,7 @@ class RecordSingleTableInventoryCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void assertCalculateOfReservedRangeQueryWithMultiColumnUniqueKeys(final
String streamingRangeType) {
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
+ RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(3, true, 2));
@@ -181,7 +181,7 @@ class RecordSingleTableInventoryCalculatorTest {
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"test1"),
Collections.emptyList(), buildUserIdUniqueKey(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(0, false, null));
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(5,
StreamingRangeType.valueOf(streamingRangeType));
+ RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(5,
StreamingRangeType.valueOf(streamingRangeType));
Optional<SingleTableInventoryCalculatedResult> calculateResult =
calculator.calculateChunk(param);
QuietlyCloser.close(param.getCalculationContext());
assertFalse(calculateResult.isPresent());
@@ -197,7 +197,7 @@ class RecordSingleTableInventoryCalculatorTest {
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"test2"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(5,
StreamingRangeType.valueOf(streamingRangeType));
+ RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(5,
StreamingRangeType.valueOf(streamingRangeType));
Optional<SingleTableInventoryCalculatedResult> calculateResult =
calculator.calculateChunk(param);
QuietlyCloser.close(param.getCalculationContext());
assertFalse(calculateResult.isPresent());
@@ -206,7 +206,7 @@ class RecordSingleTableInventoryCalculatorTest {
@ParameterizedTest
@CsvSource({"100,SMALL", "2,SMALL", "3,SMALL", "100,LARGE", "2,LARGE",
"3,LARGE"})
void assertCalculateOfRangeQueryAllWithOrderIdUniqueKeyWith3x(final int
streamingChunkCount, final String streamingRangeType) {
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(3, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
+ RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(3, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildOrderIdUniqueKey(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
@@ -244,7 +244,7 @@ class RecordSingleTableInventoryCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void
assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith50x100(final String
streamingRangeType) {
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(50, 100,
StreamingRangeType.valueOf(streamingRangeType));
+ RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(50, 100,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
@@ -270,7 +270,7 @@ class RecordSingleTableInventoryCalculatorTest {
@ParameterizedTest
@CsvSource({"100,SMALL", "2,SMALL", "3,SMALL", "100,LARGE", "2,LARGE",
"3,LARGE"})
void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith3x(final
int streamingChunkCount, final String streamingRangeType) {
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(3, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
+ RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(3, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
@@ -308,7 +308,7 @@ class RecordSingleTableInventoryCalculatorTest {
@ParameterizedTest
@CsvSource({"100,SMALL", "3,SMALL", "4,SMALL", "100,LARGE", "3,LARGE",
"4,LARGE"})
void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith2x(final
int streamingChunkCount, final String streamingRangeType) {
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(2, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
+ RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(2, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
@@ -355,7 +355,7 @@ class RecordSingleTableInventoryCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void assertCalculateOfPointQuery(final String streamingRangeType) {
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(3,
StreamingRangeType.valueOf(streamingRangeType));
+ RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(3,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.POINT_QUERY, null);
param.setUniqueKeysValues(Arrays.asList(3, 3));
@@ -371,7 +371,7 @@ class RecordSingleTableInventoryCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void assertCalculateOfPointRangeQuery(final String streamingRangeType) {
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(3,
StreamingRangeType.valueOf(streamingRangeType));
+ RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(3,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildUserIdUniqueKey(),
QueryType.POINT_QUERY, null);
param.setUniqueKeysValues(Collections.singleton(3));