This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4488af3ad11 Review and refactor data consistency check (#28058)
4488af3ad11 is described below
commit 4488af3ad11356237e58559f20b77fb643e22a63
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Aug 13 17:18:44 2023 +0800
Review and refactor data consistency check (#28058)
---
.../DataConsistencyCheckUtils.java | 1 +
...RecordSingleTableInventoryCalculatedResult.java | 3 -
.../result/TableDataConsistencyCheckResult.java | 10 +-
.../TableDataConsistencyCountCheckResult.java | 15 +--
.../yaml/YamlTableDataConsistencyCheckResult.java | 24 ++++-
...YamlTableDataConsistencyCheckResultSwapper.java | 1 -
.../table/MatchingTableDataConsistencyChecker.java | 28 ++---
.../RecordSingleTableInventoryCalculator.java | 12 ++-
.../ConsistencyCheckDataBuilder.java | 115 +++++++++++++++++++++
...rdSingleTableInventoryCalculatedResultTest.java | 78 +-------------
.../h2/sqlbuilder}/H2PipelineSQLBuilder.java | 2 +-
...peline.spi.sqlbuilder.DialectPipelineSQLBuilder | 2 +-
.../RecordSingleTableInventoryCalculatorTest.java | 69 +++++--------
13 files changed, 210 insertions(+), 150 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
index 60bc9f44741..adb79c32554 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/DataConsistencyCheckUtils.java
@@ -67,6 +67,7 @@ public final class DataConsistencyCheckUtils {
@SneakyThrows(SQLException.class)
private static boolean isMatched(final EqualsBuilder equalsBuilder, final
Object thisColumnValue, final Object thatColumnValue) {
+ equalsBuilder.reset();
if (thisColumnValue instanceof SQLXML && thatColumnValue instanceof
SQLXML) {
return ((SQLXML) thisColumnValue).getString().equals(((SQLXML)
thatColumnValue).getString());
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
index 1253503bf12..1db10f4352b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.consistencycheck.result;
-import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -41,7 +40,6 @@ public final class RecordSingleTableInventoryCalculatedResult
implements SingleT
private final int recordsCount;
- @Getter(AccessLevel.NONE)
private final List<Map<String, Object>> records;
public RecordSingleTableInventoryCalculatedResult(final Object
maxUniqueKeyValue, final List<Map<String, Object>> records) {
@@ -77,7 +75,6 @@ public final class RecordSingleTableInventoryCalculatedResult
implements SingleT
Iterator<Map<String, Object>> thisRecordsIterator = records.iterator();
Iterator<Map<String, Object>> thatRecordsIterator =
that.records.iterator();
while (thisRecordsIterator.hasNext() && thatRecordsIterator.hasNext())
{
- equalsBuilder.reset();
Map<String, Object> thisRecord = thisRecordsIterator.next();
Map<String, Object> thatRecord = thatRecordsIterator.next();
if (thisRecord.size() != thatRecord.size()) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
index c768c3c88b5..7eec6445fbc 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCheckResult.java
@@ -27,22 +27,22 @@ import lombok.ToString;
@ToString
public final class TableDataConsistencyCheckResult {
- private final TableDataConsistencyCheckIgnoredType ignoredType;
-
private final TableDataConsistencyCountCheckResult countCheckResult;
private final TableDataConsistencyContentCheckResult contentCheckResult;
+ private final TableDataConsistencyCheckIgnoredType ignoredType;
+
public TableDataConsistencyCheckResult(final
TableDataConsistencyCountCheckResult countCheckResult, final
TableDataConsistencyContentCheckResult contentCheckResult) {
- ignoredType = null;
this.countCheckResult = countCheckResult;
this.contentCheckResult = contentCheckResult;
+ ignoredType = null;
}
public TableDataConsistencyCheckResult(final
TableDataConsistencyCheckIgnoredType ignoredType) {
+ countCheckResult = new TableDataConsistencyCountCheckResult(-1, -1);
+ contentCheckResult = new TableDataConsistencyContentCheckResult(false);
this.ignoredType = ignoredType;
- countCheckResult = null;
- contentCheckResult = null;
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
index 81dd4252319..aaf2b5ef02c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableDataConsistencyCountCheckResult.java
@@ -18,11 +18,13 @@
package org.apache.shardingsphere.data.pipeline.core.consistencycheck.result;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.ToString;
/**
* Table data consistency count check result.
*/
+@RequiredArgsConstructor
@Getter
@ToString
public final class TableDataConsistencyCountCheckResult {
@@ -31,11 +33,12 @@ public final class TableDataConsistencyCountCheckResult {
private final long targetRecordsCount;
- private final boolean matched;
-
- public TableDataConsistencyCountCheckResult(final long sourceRecordsCount,
final long targetRecordsCount) {
- this.sourceRecordsCount = sourceRecordsCount;
- this.targetRecordsCount = targetRecordsCount;
- matched = sourceRecordsCount == targetRecordsCount;
+ /**
+ * Is matched.
+ *
+ * @return true if records count equals between source and target
+ */
+ public boolean isMatched() {
+ return sourceRecordsCount == targetRecordsCount && sourceRecordsCount
>= 0;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
index 9c14939bd39..6dc803ec3ed 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java
@@ -17,13 +17,16 @@
package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml;
+import lombok.AllArgsConstructor;
import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
* Yaml table data consistency check result config.
*/
+@NoArgsConstructor
@Getter
@Setter
public final class YamlTableDataConsistencyCheckResult implements
YamlConfiguration {
@@ -34,6 +37,11 @@ public final class YamlTableDataConsistencyCheckResult
implements YamlConfigurat
private String ignoredType;
+ public YamlTableDataConsistencyCheckResult(final
YamlTableDataConsistencyCountCheckResult countCheckResult, final
YamlTableDataConsistencyContentCheckResult contentCheckResult) {
+ this.countCheckResult = countCheckResult;
+ this.contentCheckResult = contentCheckResult;
+ }
+
/**
* YAML table data consistency count result.
*/
@@ -45,12 +53,26 @@ public final class YamlTableDataConsistencyCheckResult
implements YamlConfigurat
private long targetRecordsCount;
- private boolean matched;
+ /**
+ * Add records count.
+ *
+ * @param delta delta count
+ * @param onSource add on source or target
+ */
+ public void addRecordsCount(final long delta, final boolean onSource) {
+ if (onSource) {
+ sourceRecordsCount += delta;
+ } else {
+ targetRecordsCount += delta;
+ }
+ }
}
/**
* YAML table data consistency content result.
*/
+ @NoArgsConstructor
+ @AllArgsConstructor
@Getter
@Setter
public static class YamlTableDataConsistencyContentCheckResult implements
YamlConfiguration {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
index 619eeaa95da..4003f101813 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResultSwapper.java
@@ -42,7 +42,6 @@ public final class YamlTableDataConsistencyCheckResultSwapper
implements YamlCon
YamlTableDataConsistencyCountCheckResult countCheckResult = new
YamlTableDataConsistencyCountCheckResult();
countCheckResult.setSourceRecordsCount(data.getCountCheckResult().getSourceRecordsCount());
countCheckResult.setTargetRecordsCount(data.getCountCheckResult().getTargetRecordsCount());
- countCheckResult.setMatched(data.getContentCheckResult().isMatched());
result.setCountCheckResult(countCheckResult);
YamlTableDataConsistencyContentCheckResult contentCheckResult = new
YamlTableDataConsistencyContentCheckResult();
contentCheckResult.setMatched(data.getContentCheckResult().isMatched());
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
index cb388404dc4..b88f7bf2d17 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
@@ -23,8 +23,10 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult.YamlTableDataConsistencyContentCheckResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult.YamlTableDataConsistencyCountCheckResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
@@ -82,19 +84,17 @@ public abstract class MatchingTableDataConsistencyChecker
implements TableDataCo
private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final
Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults,
final Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults,
final TableDataConsistencyCheckParameter param, final ThreadPoolExecutor
executor) {
- long sourceRecordsCount = 0;
- long targetRecordsCount = 0;
- boolean contentMatched = true;
+ YamlTableDataConsistencyCheckResult checkResult = new
YamlTableDataConsistencyCheckResult(new
YamlTableDataConsistencyCountCheckResult(), new
YamlTableDataConsistencyContentCheckResult(true));
while (sourceCalculatedResults.hasNext() &&
targetCalculatedResults.hasNext()) {
if (null != param.getReadRateLimitAlgorithm()) {
param.getReadRateLimitAlgorithm().intercept(JobOperationType.SELECT, 1);
}
SingleTableInventoryCalculatedResult sourceCalculatedResult =
waitFuture(executor.submit(sourceCalculatedResults::next));
SingleTableInventoryCalculatedResult targetCalculatedResult =
waitFuture(executor.submit(targetCalculatedResults::next));
- sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
- targetRecordsCount += targetCalculatedResult.getRecordsCount();
- contentMatched = Objects.equals(sourceCalculatedResult,
targetCalculatedResult);
- if (!contentMatched) {
+
checkResult.getCountCheckResult().addRecordsCount(sourceCalculatedResult.getRecordsCount(),
true);
+
checkResult.getCountCheckResult().addRecordsCount(targetCalculatedResult.getRecordsCount(),
false);
+ if (!Objects.equals(sourceCalculatedResult,
targetCalculatedResult)) {
+ checkResult.getContentCheckResult().setMatched(false);
log.info("content matched false, jobId={}, sourceTable={},
targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(),
param.getTargetTable(), param.getUniqueKeys());
break;
}
@@ -108,12 +108,16 @@ public abstract class MatchingTableDataConsistencyChecker
implements TableDataCo
}
if (sourceCalculatedResults.hasNext()) {
// TODO Refactor SingleTableInventoryCalculatedResult to represent
inaccurate number
- return new TableDataConsistencyCheckResult(new
TableDataConsistencyCountCheckResult(sourceRecordsCount + 1,
targetRecordsCount), new TableDataConsistencyContentCheckResult(false));
+ checkResult.getCountCheckResult().addRecordsCount(1, true);
+ checkResult.getContentCheckResult().setMatched(false);
+ return new
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}
if (targetCalculatedResults.hasNext()) {
- return new TableDataConsistencyCheckResult(new
TableDataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount +
1), new TableDataConsistencyContentCheckResult(false));
+ checkResult.getCountCheckResult().addRecordsCount(1, false);
+ checkResult.getContentCheckResult().setMatched(false);
+ return new
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}
- return new TableDataConsistencyCheckResult(new
TableDataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount),
new TableDataConsistencyContentCheckResult(contentMatched));
+ return new
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}
// TODO use digest (crc32, murmurhash)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index a3d6b811f3f..abcbc0e2143 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -122,10 +122,7 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
preparedStatement.setFetchSize(chunkSize);
}
calculationContext.setPreparedStatement(preparedStatement);
- Object tableCheckPosition = param.getTableCheckPosition();
- if (null != tableCheckPosition) {
- preparedStatement.setObject(1, tableCheckPosition);
- }
+ setParameters(preparedStatement, param);
ResultSet resultSet = preparedStatement.executeQuery();
calculationContext.setResultSet(resultSet);
}
@@ -140,6 +137,13 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
return
pipelineSQLBuilder.buildQueryAllOrderingSQL(param.getSchemaName(),
param.getLogicTableName(), columnNames, param.getFirstUniqueKey().getName(),
firstQuery);
}
+ private void setParameters(final PreparedStatement preparedStatement,
final SingleTableInventoryCalculateParameter param) throws SQLException {
+ Object tableCheckPosition = param.getTableCheckPosition();
+ if (null != tableCheckPosition) {
+ preparedStatement.setObject(1, tableCheckPosition);
+ }
+ }
+
@RequiredArgsConstructor
private static final class CalculationContext implements AutoCloseable {
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
new file mode 100644
index 00000000000..9284bdff825
--- /dev/null
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckDataBuilder.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.consistencycheck;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public final class ConsistencyCheckDataBuilder {
+
+ /**
+ * Build fixed full type record.
+ *
+ * @param id id
+ * @return built record
+ */
+ public static Map<String, Object> buildFixedFullTypeRecord(final int id) {
+ Map<String, Object> result = new LinkedHashMap<>();
+ result.put("id", id);
+ result.put("c_bool", true);
+ result.put("c_int1", Byte.MAX_VALUE);
+ result.put("c_int2", Short.MAX_VALUE);
+ result.put("c_int4", Integer.MAX_VALUE);
+ result.put("c_int8", Long.MAX_VALUE);
+ result.put("c_float", 1.23F);
+ result.put("c_double", 2.3456D);
+ result.put("c_decimal", BigDecimal.valueOf(1.23456789D));
+ result.put("c_varchar", "ok");
+ result.put("c_time", new Time(123456789L));
+ result.put("c_date", new Date(123456789L));
+ result.put("c_timestamp", new Timestamp(123456789L));
+ result.put("c_array", new int[]{1, 2, 3});
+ result.put("c_blob", null);
+ return result;
+ }
+
+ /**
+ * Modify column value randomly.
+ *
+ * @param record record
+ * @param key which key will be modified
+ * @return original record
+ */
+ public static Map<String, Object> modifyColumnValueRandomly(final
Map<String, Object> record, final String key) {
+ Object value = record.get(key);
+ record.put(key, getModifiedValue(value));
+ return record;
+ }
+
+ private static Object getModifiedValue(final Object value) {
+ if (null == value) {
+ return new Object();
+ }
+ if (value instanceof Boolean) {
+ return !((Boolean) value);
+ }
+ if (value instanceof Byte) {
+ return (byte) ((Byte) value - 1);
+ }
+ if (value instanceof Short) {
+ return (short) ((Short) value - 1);
+ }
+ if (value instanceof Integer) {
+ return (Integer) value - 1;
+ }
+ if (value instanceof Long) {
+ return (Long) value - 1L;
+ }
+ if (value instanceof Float) {
+ return (Float) value - 1F;
+ }
+ if (value instanceof Double) {
+ return (Double) value - 1D;
+ }
+ if (value instanceof BigDecimal) {
+ return ((BigDecimal) value).subtract(BigDecimal.ONE);
+ }
+ if (value instanceof String) {
+ return value + "-";
+ }
+ if (value instanceof Time) {
+ return new Time(((Time) value).getTime() - 1);
+ }
+ if (value instanceof Date) {
+ return new Date(((Date) value).getTime() - 1);
+ }
+ if (value instanceof Timestamp) {
+ return new Timestamp(((Timestamp) value).getTime() - 1);
+ }
+ if (value instanceof int[]) {
+ int[] result = ((int[]) value).clone();
+ result[0] = result[0] - 1;
+ return result;
+ }
+ return value;
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResultTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResultTest.java
index dc2b86e2f81..7603b8de762 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResultTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResultTest.java
@@ -17,16 +17,13 @@
package org.apache.shardingsphere.data.pipeline.core.consistencycheck.result;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckDataBuilder;
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import java.math.RoundingMode;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
@@ -49,25 +46,6 @@ class RecordSingleTableInventoryCalculatedResultTest {
assertThat(actual, is(expected));
}
- private Map<String, Object> buildFixedFullTypeRecord() {
- Map<String, Object> result = new LinkedHashMap<>();
- result.put("c_bool", true);
- result.put("c_int1", Byte.MAX_VALUE);
- result.put("c_int2", Short.MAX_VALUE);
- result.put("c_int4", Integer.MAX_VALUE);
- result.put("c_int8", Long.MAX_VALUE);
- result.put("c_float", 1.23F);
- result.put("c_double", 2.3456D);
- result.put("c_decimal", BigDecimal.valueOf(1.23456789D));
- result.put("c_varchar", "ok");
- result.put("c_time", new Time(123456789L));
- result.put("c_date", new Date(123456789L));
- result.put("c_timestamp", new Timestamp(123456789L));
- result.put("c_array", new int[]{1, 2, 3});
- result.put("c_blob", null);
- return result;
- }
-
@Test
void assertFullTypeRecordsEqualsWithDifferentDecimalScale() {
RecordSingleTableInventoryCalculatedResult expected = new
RecordSingleTableInventoryCalculatedResult(1000,
Collections.singletonList(buildFixedFullTypeRecord()));
@@ -107,57 +85,11 @@ class RecordSingleTableInventoryCalculatedResultTest {
});
}
- private Map<String, Object> modifyColumnValueRandomly(final Map<String,
Object> record, final String key) {
- Object value = record.get(key);
- record.put(key, getModifiedValue(value));
- return record;
+ private Map<String, Object> buildFixedFullTypeRecord() {
+ return ConsistencyCheckDataBuilder.buildFixedFullTypeRecord(1);
}
- private Object getModifiedValue(final Object value) {
- if (null == value) {
- return new Object();
- }
- if (value instanceof Boolean) {
- return !((Boolean) value);
- }
- if (value instanceof Byte) {
- return (Byte) value - 1;
- }
- if (value instanceof Short) {
- return (Short) value - 1;
- }
- if (value instanceof Integer) {
- return (Integer) value - 1;
- }
- if (value instanceof Long) {
- return (Long) value - 1;
- }
- if (value instanceof Float) {
- return (Float) value - 1;
- }
- if (value instanceof Double) {
- return (Double) value - 1;
- }
- if (value instanceof BigDecimal) {
- return ((BigDecimal) value).subtract(BigDecimal.ONE);
- }
- if (value instanceof String) {
- return value + "-";
- }
- if (value instanceof Time) {
- return new Time(((Time) value).getTime() - 1);
- }
- if (value instanceof Date) {
- return new Date(((Date) value).getTime() - 1);
- }
- if (value instanceof Timestamp) {
- return new Timestamp(((Timestamp) value).getTime() - 1);
- }
- if (value instanceof int[]) {
- int[] result = ((int[]) value).clone();
- result[0] = result[0] - 1;
- return result;
- }
- return value;
+ private Map<String, Object> modifyColumnValueRandomly(final Map<String,
Object> record, final String key) {
+ return ConsistencyCheckDataBuilder.modifyColumnValueRandomly(record,
key);
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/sqlbuilder/H2PipelineSQLBuilder.java
similarity index 94%
rename from
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
rename to
kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/sqlbuilder/H2PipelineSQLBuilder.java
index e50a85f8c89..6025f61e06b 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2PipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/sqlbuilder/H2PipelineSQLBuilder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.h2.sqlbuilder;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder;
diff --git
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
b/kernel/data-pipeline/dialect/h2/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
similarity index 90%
rename from
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
rename to
kernel/data-pipeline/dialect/h2/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
index 582509d7fc4..525cc3bb507 100644
---
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
+++
b/kernel/data-pipeline/dialect/h2/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.DialectPipelineSQLBuilder
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.test.it.data.pipeline.core.fixture.H2PipelineSQLBuilder
+org.apache.shardingsphere.data.pipeline.h2.sqlbuilder.H2PipelineSQLBuilder
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
index 14c470df844..6b4c14e1f71 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.test.it.data.pipeline.core.consistencycheck.table.calculator;
import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang3.RandomStringUtils;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
@@ -29,7 +30,6 @@ import
org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.mockito.internal.configuration.plugins.Plugins;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -45,22 +45,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class RecordSingleTableInventoryCalculatorTest {
- private static PipelineDataSourceWrapper source;
-
- private static PipelineDataSourceWrapper target;
+ private static PipelineDataSourceWrapper dataSource;
@BeforeAll
static void setUp() throws Exception {
- source = new
PipelineDataSourceWrapper(createHikariDataSource("source_ds"),
TypedSPILoader.getService(DatabaseType.class, "H2"));
- createTableAndInitData(source, "t_order_copy");
- target = new
PipelineDataSourceWrapper(createHikariDataSource("target_ds"),
TypedSPILoader.getService(DatabaseType.class, "H2"));
- createTableAndInitData(target, "t_order");
+ dataSource = new
PipelineDataSourceWrapper(createHikariDataSource("calc_" +
RandomStringUtils.randomAlphanumeric(9)),
TypedSPILoader.getService(DatabaseType.class, "H2"));
+ createTableAndInitData(dataSource);
}
@AfterAll
static void tearDown() throws Exception {
- source.close();
- target.close();
+ dataSource.close();
}
private static HikariDataSource createHikariDataSource(final String
databaseName) {
@@ -75,11 +70,11 @@ class RecordSingleTableInventoryCalculatorTest {
return result;
}
- private static void createTableAndInitData(final PipelineDataSourceWrapper
dataSource, final String tableName) throws SQLException {
+ private static void createTableAndInitData(final PipelineDataSourceWrapper
dataSource) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- String sql = String.format("CREATE TABLE %s (order_id INT NOT
NULL, user_id INT NOT NULL, status VARCHAR(45) NULL, PRIMARY KEY (order_id))",
tableName);
+ String sql = "CREATE TABLE t_order (order_id INT PRIMARY KEY,
user_id INT NOT NULL, status VARCHAR(12))";
connection.createStatement().execute(sql);
- PreparedStatement preparedStatement =
connection.prepareStatement(String.format("INSERT INTO %s (order_id, user_id,
status) VALUES (?, ?, ?)", tableName));
+ PreparedStatement preparedStatement =
connection.prepareStatement("INSERT INTO t_order (order_id, user_id, status)
VALUES (?, ?, ?)");
for (int i = 0; i < 10; i++) {
preparedStatement.setInt(1, i + 1);
preparedStatement.setInt(2, i + 1);
@@ -90,41 +85,29 @@ class RecordSingleTableInventoryCalculatorTest {
}
@Test
- void assertCalculateFromBegin() throws ReflectiveOperationException {
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(1000);
-
Plugins.getMemberAccessor().set(RecordSingleTableInventoryCalculator.class.getDeclaredField("chunkSize"),
calculator, 5);
- SingleTableInventoryCalculateParameter sourceParam =
generateParameter(source, "t_order_copy", 0);
- Optional<SingleTableInventoryCalculatedResult> sourceCalculateResult =
calculator.calculateChunk(sourceParam);
- SingleTableInventoryCalculateParameter targetParam =
generateParameter(target, "t_order", 0);
- Optional<SingleTableInventoryCalculatedResult> targetCalculateResult =
calculator.calculateChunk(targetParam);
- assertTrue(sourceCalculateResult.isPresent());
- assertTrue(targetCalculateResult.isPresent());
-
assertTrue(sourceCalculateResult.get().getMaxUniqueKeyValue().isPresent());
-
assertTrue(targetCalculateResult.get().getMaxUniqueKeyValue().isPresent());
- assertThat(sourceCalculateResult.get().getMaxUniqueKeyValue().get(),
is(targetCalculateResult.get().getMaxUniqueKeyValue().get()));
- assertThat(targetCalculateResult.get().getMaxUniqueKeyValue().get(),
is(5L));
- assertThat(sourceCalculateResult.get(),
is(targetCalculateResult.get()));
+ void assertCalculateOfAllQueryFromBegin() {
+ RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(5);
+ SingleTableInventoryCalculateParameter param =
generateParameter(dataSource, 0);
+ Optional<SingleTableInventoryCalculatedResult> calculateResult =
calculator.calculateChunk(param);
+ assertTrue(calculateResult.isPresent());
+ SingleTableInventoryCalculatedResult actual = calculateResult.get();
+ assertTrue(actual.getMaxUniqueKeyValue().isPresent());
+ assertThat(actual.getMaxUniqueKeyValue().get(), is(5L));
}
@Test
- void assertCalculateFromMiddle() throws ReflectiveOperationException {
- RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(1000);
-
Plugins.getMemberAccessor().set(RecordSingleTableInventoryCalculator.class.getDeclaredField("chunkSize"),
calculator, 5);
- SingleTableInventoryCalculateParameter sourceParam =
generateParameter(source, "t_order_copy", 5);
- Optional<SingleTableInventoryCalculatedResult> sourceCalculateResult =
calculator.calculateChunk(sourceParam);
- SingleTableInventoryCalculateParameter targetParam =
generateParameter(target, "t_order", 5);
- Optional<SingleTableInventoryCalculatedResult> targetCalculateResult =
calculator.calculateChunk(targetParam);
- assertTrue(sourceCalculateResult.isPresent());
- assertTrue(targetCalculateResult.isPresent());
-
assertTrue(sourceCalculateResult.get().getMaxUniqueKeyValue().isPresent());
-
assertTrue(targetCalculateResult.get().getMaxUniqueKeyValue().isPresent());
- assertThat(sourceCalculateResult.get().getMaxUniqueKeyValue().get(),
is(targetCalculateResult.get().getMaxUniqueKeyValue().get()));
- assertThat(targetCalculateResult.get().getMaxUniqueKeyValue().get(),
is(10L));
- assertThat(sourceCalculateResult.get(),
is(targetCalculateResult.get()));
+ void assertCalculateOfAllQueryFromMiddle() {
+ RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(5);
+ SingleTableInventoryCalculateParameter param =
generateParameter(dataSource, 5);
+ Optional<SingleTableInventoryCalculatedResult> calculateResult =
calculator.calculateChunk(param);
+ assertTrue(calculateResult.isPresent());
+ SingleTableInventoryCalculatedResult actual = calculateResult.get();
+ assertTrue(actual.getMaxUniqueKeyValue().isPresent());
+ assertThat(actual.getMaxUniqueKeyValue().get(), is(10L));
}
- private SingleTableInventoryCalculateParameter generateParameter(final
PipelineDataSourceWrapper dataSource, final String logicTableName, final Object
dataCheckPosition) {
+ private SingleTableInventoryCalculateParameter generateParameter(final
PipelineDataSourceWrapper dataSource, final Object dataCheckPosition) {
List<PipelineColumnMetaData> uniqueKeys =
Collections.singletonList(new PipelineColumnMetaData(1, "order_id",
Types.INTEGER, "integer", false, true, true));
- return new SingleTableInventoryCalculateParameter(dataSource, new
SchemaTableName(null, logicTableName), Collections.emptyList(), uniqueKeys,
dataCheckPosition);
+ return new SingleTableInventoryCalculateParameter(dataSource, new
SchemaTableName(null, "t_order"), Collections.emptyList(), uniqueKeys,
dataCheckPosition);
}
}