This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 05b4531159e Support unique key first big integer column splitting
(#37574)
05b4531159e is described below
commit 05b4531159e71e91b9b9402c8c61a7ea59477577
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Dec 29 21:10:03 2025 +0800
Support unique key first big integer column splitting (#37574)
* Support unique key first big integer column splitting
* Update RELEASE-NOTES.md
---
RELEASE-NOTES.md | 5 +++--
.../type/pk/PrimaryKeyIngestPositionFactory.java | 12 ++++++++++--
.../type/pk/type/IntegerPrimaryKeyIngestPosition.java | 14 ++++++++------
.../InventoryPositionEstimatedCalculator.java | 2 +-
.../position/exact/IntegerPositionHandler.java | 15 +++++++++------
.../type/pk/PrimaryKeyIngestPositionFactoryTest.java | 15 ++++++++-------
.../pk/type/IntegerPrimaryKeyIngestPositionTest.java | 8 +++++---
.../job/progress/TransmissionJobItemProgressTest.java | 3 ++-
.../InventoryPositionEstimatedCalculatorTest.java | 14 +++++++++-----
.../cdc/util/DataRecordResultConvertUtilsTest.java | 3 ++-
.../context/ConsistencyCheckJobItemContextTest.java | 18 ++++++++++++------
.../InventoryIntegerPositionExactCalculatorTest.java | 17 +++++++++++------
.../inventory/splitter/InventoryTaskSplitterTest.java | 9 +++++----
.../data/pipeline/core/task/InventoryTaskTest.java | 3 ++-
14 files changed, 87 insertions(+), 51 deletions(-)
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 97afee67392..a4001cb589c 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -65,10 +65,11 @@
1. Pipeline: Support pipeline job realtime reflection on proxy global
properties after restarting -
[#36749](https://github.com/apache/shardingsphere/pull/36749)
1. Pipeline: InventoryDumper reuse table inventory calculator for better
function and performance -
[#36830](https://github.com/apache/shardingsphere/pull/36830)
1. Pipeline: Improve "alter transmission rule": verify STREAM_CHANNEL TYPE
NAME - [#36864](https://github.com/apache/shardingsphere/pull/36864)
-1. Pipeline: InventoryDumperContextSplitter supports multi-columns unique key
first integer column splitting -
[#36935](https://github.com/apache/shardingsphere/pull/36935)
+1. Pipeline: Support multi-columns unique key first integer column splitting -
[#36935](https://github.com/apache/shardingsphere/pull/36935)
1. Pipeline: Support unique key first integer column exact splitting -
[#37517](https://github.com/apache/shardingsphere/pull/37517)
-1. Pipeline: Improve InventoryPositionEstimatedCalculator: support possible
null unique key value -
[#37522](https://github.com/apache/shardingsphere/pull/37522)
+1. Pipeline: Support unique key first integer column possible null value -
[#37522](https://github.com/apache/shardingsphere/pull/37522)
1. Pipeline: Support unique key first integer column exact or estimated
splitting based on data sparseness -
[#37542](https://github.com/apache/shardingsphere/pull/37542)
+1. Pipeline: Support unique key first big integer column splitting -
[#37574](https://github.com/apache/shardingsphere/pull/37574)
1. Pipeline: Support unique key first string column exact splitting -
[#37543](https://github.com/apache/shardingsphere/pull/37543)
1. Encrypt: Support handling show create view result decoration in encrypt -
[#37299](https://github.com/apache/shardingsphere/pull/37299)
1. JDBC: Enhance ResultSetUtils to support flexible string date/time
conversions - [37424](https://github.com/apache/shardingsphere/pull/37424)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactory.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactory.java
index da57e51d894..c2ba6deabdd 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactory.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactory.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import java.math.BigInteger;
import java.util.List;
/**
@@ -50,7 +51,7 @@ public final class PrimaryKeyIngestPositionFactory {
String endValue = parts.get(2);
switch (type) {
case 'i':
- return new
IntegerPrimaryKeyIngestPosition(Strings.isNullOrEmpty(beginValue) ? null :
Long.parseLong(beginValue), Strings.isNullOrEmpty(endValue) ? null :
Long.parseLong(endValue));
+ return new
IntegerPrimaryKeyIngestPosition(Strings.isNullOrEmpty(beginValue) ? null : new
BigInteger(beginValue), Strings.isNullOrEmpty(endValue) ? null : new
BigInteger(endValue));
case 's':
return new StringPrimaryKeyIngestPosition(beginValue,
endValue);
case 'u':
@@ -69,7 +70,7 @@ public final class PrimaryKeyIngestPositionFactory {
*/
public static PrimaryKeyIngestPosition<?> newInstance(final Object
beginValue, final Object endValue) {
if (beginValue instanceof Number) {
- return new IntegerPrimaryKeyIngestPosition(((Number)
beginValue).longValue(), null == endValue ? Long.MAX_VALUE : ((Number)
endValue).longValue());
+ return new
IntegerPrimaryKeyIngestPosition(convertToBigInteger((Number) beginValue), null
== endValue ? null : convertToBigInteger((Number) endValue));
}
if (beginValue instanceof CharSequence) {
return new StringPrimaryKeyIngestPosition(beginValue.toString(),
null == endValue ? null : endValue.toString());
@@ -77,4 +78,11 @@ public final class PrimaryKeyIngestPositionFactory {
// TODO support more types, e.g. byte[] (MySQL varbinary)
return new UnsupportedKeyIngestPosition();
}
+
+ private static BigInteger convertToBigInteger(final Number number) {
+ if (number instanceof BigInteger) {
+ return (BigInteger) number;
+ }
+ return BigInteger.valueOf(number.longValue());
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPosition.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPosition.java
index cdbd21e37c7..c82121c2025 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPosition.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPosition.java
@@ -19,27 +19,29 @@ package
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.typ
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+import java.math.BigInteger;
+
/**
* Integer primary key ingest position.
*/
-public final class IntegerPrimaryKeyIngestPosition implements
PrimaryKeyIngestPosition<Long> {
+public final class IntegerPrimaryKeyIngestPosition implements
PrimaryKeyIngestPosition<BigInteger> {
- private final Long beginValue;
+ private final BigInteger beginValue;
- private final Long endValue;
+ private final BigInteger endValue;
- public IntegerPrimaryKeyIngestPosition(final Long beginValue, final Long
endValue) {
+ public IntegerPrimaryKeyIngestPosition(final BigInteger beginValue, final
BigInteger endValue) {
this.beginValue = beginValue;
this.endValue = endValue;
}
@Override
- public Long getBeginValue() {
+ public BigInteger getBeginValue() {
return beginValue;
}
@Override
- public Long getEndValue() {
+ public BigInteger getEndValue() {
return endValue;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
index 0fd2663af9f..7889168491e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
@@ -89,7 +89,7 @@ public final class InventoryPositionEstimatedCalculator {
IntegerRangeSplittingIterator rangeIterator = new
IntegerRangeSplittingIterator(lowerBound, upperBound, stepSize);
while (rangeIterator.hasNext()) {
Range<BigInteger> range = rangeIterator.next();
- result.add(new
IntegerPrimaryKeyIngestPosition(range.getLowerBound().longValue(),
range.getUpperBound().longValue()));
+ result.add(new
IntegerPrimaryKeyIngestPosition(range.getLowerBound(), range.getUpperBound()));
}
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
index f2126d99725..f401132e436 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
@@ -19,6 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculat
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -26,20 +28,21 @@ import java.sql.SQLException;
/**
* Integer position handler.
*/
-public final class IntegerPositionHandler implements
DataTypePositionHandler<Long> {
+public final class IntegerPositionHandler implements
DataTypePositionHandler<BigInteger> {
@Override
- public Long readColumnValue(final ResultSet resultSet, final int
columnIndex) throws SQLException {
- return resultSet.getLong(columnIndex);
+ public BigInteger readColumnValue(final ResultSet resultSet, final int
columnIndex) throws SQLException {
+ BigDecimal decimal = resultSet.getBigDecimal(columnIndex);
+ return null == decimal ? null : decimal.toBigInteger();
}
@Override
- public void setPreparedStatementValue(final PreparedStatement
preparedStatement, final int parameterIndex, final Long value) throws
SQLException {
- preparedStatement.setLong(parameterIndex, value);
+ public void setPreparedStatementValue(final PreparedStatement
preparedStatement, final int parameterIndex, final BigInteger value) throws
SQLException {
+ preparedStatement.setBigDecimal(parameterIndex, new BigDecimal(value));
}
@Override
- public IntegerPrimaryKeyIngestPosition createIngestPosition(final Long
lowerBound, final Long upperBound) {
+ public IntegerPrimaryKeyIngestPosition createIngestPosition(final
BigInteger lowerBound, final BigInteger upperBound) {
return new IntegerPrimaryKeyIngestPosition(lowerBound, upperBound);
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactoryTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactoryTest.java
index c8b310a38bf..a67a7034762 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactoryTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactoryTest.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
import org.junit.jupiter.api.Test;
+import java.math.BigInteger;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -34,9 +35,9 @@ class PrimaryKeyIngestPositionFactoryTest {
@Test
void assertNewInstanceWithIntegerPrimaryKeyIngestPosition() {
-
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,100,200"),
new IntegerPrimaryKeyIngestPosition(100L, 200L));
-
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,100,"),
new IntegerPrimaryKeyIngestPosition(100L, null));
-
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,,200"),
new IntegerPrimaryKeyIngestPosition(null, 200L));
+
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,100,200"),
new IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(100L),
BigInteger.valueOf(200L)));
+
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,100,"),
new IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(100L), null));
+
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,,200"),
new IntegerPrimaryKeyIngestPosition(null, BigInteger.valueOf(200L)));
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,,"),
new IntegerPrimaryKeyIngestPosition(null, null));
}
@@ -74,16 +75,16 @@ class PrimaryKeyIngestPositionFactoryTest {
void assertNewInstanceWithNumberRange() {
IntegerPrimaryKeyIngestPosition actual =
(IntegerPrimaryKeyIngestPosition)
PrimaryKeyIngestPositionFactory.newInstance(100, 200);
assertThat(actual.getType(), is('i'));
- assertThat(actual.getBeginValue(), is(100L));
- assertThat(actual.getEndValue(), is(200L));
+ assertThat(actual.getBeginValue(), is(BigInteger.valueOf(100L)));
+ assertThat(actual.getEndValue(), is(BigInteger.valueOf(200L)));
}
@Test
void assertNewInstanceWithNumberNullEndRange() {
IntegerPrimaryKeyIngestPosition actual =
(IntegerPrimaryKeyIngestPosition)
PrimaryKeyIngestPositionFactory.newInstance(100, null);
assertThat(actual.getType(), is('i'));
- assertThat(actual.getBeginValue(), is(100L));
- assertThat(actual.getEndValue(), is(Long.MAX_VALUE));
+ assertThat(actual.getBeginValue(), is(BigInteger.valueOf(100L)));
+ assertNull(actual.getEndValue());
}
@Test
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPositionTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPositionTest.java
index e5e79c14979..0aad9848b0d 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPositionTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPositionTest.java
@@ -19,6 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.typ
import org.junit.jupiter.api.Test;
+import java.math.BigInteger;
+
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -26,13 +28,13 @@ class IntegerPrimaryKeyIngestPositionTest {
@Test
void assertToString() {
- assertThat(new IntegerPrimaryKeyIngestPosition(1L, 100L).toString(),
is("i,1,100"));
+ assertThat(new IntegerPrimaryKeyIngestPosition(BigInteger.ONE,
BigInteger.valueOf(100L)).toString(), is("i,1,100"));
}
@Test
void assertToStringWithNullValue() {
- assertThat(new IntegerPrimaryKeyIngestPosition(1L, null).toString(),
is("i,1,"));
- assertThat(new IntegerPrimaryKeyIngestPosition(null, 100L).toString(),
is("i,,100"));
+ assertThat(new IntegerPrimaryKeyIngestPosition(BigInteger.ONE,
null).toString(), is("i,1,"));
+ assertThat(new IntegerPrimaryKeyIngestPosition(null,
BigInteger.valueOf(100L)).toString(), is("i,,100"));
assertThat(new IntegerPrimaryKeyIngestPosition(null, null).toString(),
is("i,,"));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgressTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgressTest.java
index f202cc8b5dd..723803a9def 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgressTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgressTest.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.infra.util.file.SystemResourceFileUtils;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.junit.jupiter.api.Test;
+import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -84,7 +85,7 @@ class TransmissionJobItemProgressTest {
@Test
void assertGetProgressesCorrectly() {
Map<String, InventoryTaskProgress> progresses = new HashMap<>(4, 1F);
- progresses.put("ds.order_item#0", new InventoryTaskProgress(new
IntegerPrimaryKeyIngestPosition(1L, 100L)));
+ progresses.put("ds.order_item#0", new InventoryTaskProgress(new
IntegerPrimaryKeyIngestPosition(BigInteger.ONE, BigInteger.valueOf(100L))));
progresses.put("ds.order_item#1", new InventoryTaskProgress(new
UnsupportedKeyIngestPosition()));
progresses.put("ds.order#0", new InventoryTaskProgress(new
IngestFinishedPosition()));
progresses.put("ds.test_order#0", new InventoryTaskProgress(new
StringPrimaryKeyIngestPosition("1", "100")));
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
index f16feb375af..4e1d5a3d01e 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
@@ -35,8 +35,12 @@ class InventoryPositionEstimatedCalculatorTest {
void assertGetIntegerPositions() {
List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L,
Range.closed(BigInteger.ONE, BigInteger.valueOf(600L)), 100L);
assertThat(actualPositions.size(), is(2));
- assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(1L, 300L));
- assertPosition(actualPositions.get(1), new
IntegerPrimaryKeyIngestPosition(301L, 600L));
+ assertPosition(actualPositions.get(0), createIntegerPosition(1L,
300L));
+ assertPosition(actualPositions.get(1), createIntegerPosition(301L,
600L));
+ }
+
+ private IntegerPrimaryKeyIngestPosition createIntegerPosition(final long
beginValue, final long endValue) {
+ return new
IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(beginValue),
BigInteger.valueOf(endValue));
}
private void assertPosition(final IngestPosition actual, final
IntegerPrimaryKeyIngestPosition expected) {
@@ -63,7 +67,7 @@ class InventoryPositionEstimatedCalculatorTest {
void assertGetIntegerPositionsWithTheSameMinMax() {
List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L,
Range.closed(BigInteger.valueOf(5L), BigInteger.valueOf(5L)), 100L);
assertThat(actualPositions.size(), is(1));
- assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(5L, 5L));
+ assertPosition(actualPositions.get(0), createIntegerPosition(5L, 5L));
}
@Test
@@ -74,7 +78,7 @@ class InventoryPositionEstimatedCalculatorTest {
BigInteger upperBound = BigInteger.valueOf(Long.MAX_VALUE);
List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount,
Range.closed(lowerBound, upperBound), shardingSize);
assertThat(actualPositions.size(), is(2));
- assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(lowerBound.longValue(), 0L));
- assertPosition(actualPositions.get(1), new
IntegerPrimaryKeyIngestPosition(1L, upperBound.longValue()));
+ assertPosition(actualPositions.get(0),
createIntegerPosition(lowerBound.longValue(), 0L));
+ assertPosition(actualPositions.get(1), createIntegerPosition(1L,
upperBound.longValue()));
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
index 8deef2250cb..5ee5d4aed7e 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import java.math.BigInteger;
import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.is;
@@ -38,7 +39,7 @@ class DataRecordResultConvertUtilsTest {
@ParameterizedTest
@MethodSource("dataChangeTypeTestCases")
void assertConvertDataRecordToRecordWithNonInsertTypes(final
PipelineSQLOperationType operationType, final Record.DataChangeType
expectedDataChangeType) throws InvalidProtocolBufferException {
- DataRecord dataRecord = new DataRecord(operationType, "test_schema",
"t_user", new IntegerPrimaryKeyIngestPosition(5L, 10L), 1);
+ DataRecord dataRecord = new DataRecord(operationType, "test_schema",
"t_user", new IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(5L),
BigInteger.valueOf(10L)), 1);
dataRecord.addColumn(new NormalColumn("id", 1L, 2L, true, true));
dataRecord.setCommitTime(123L);
Record actualRecord =
DataRecordResultConvertUtils.convertDataRecordToRecord("logic_db",
"test_schema", dataRecord);
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
index 388d7560d56..0825de76307 100644
---
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
+++
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
@@ -27,6 +27,8 @@ import
org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
+import java.math.BigInteger;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
@@ -50,17 +52,21 @@ class ConsistencyCheckJobItemContextTest {
@Test
void assertConstructWithNonEmptyValues() {
ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, "H2");
- jobItemProgress.getTableCheckRangePositions().add(new
TableCheckRangePosition(0, DATA_NODE, TABLE, new
IntegerPrimaryKeyIngestPosition(1L, 100L),
- new IntegerPrimaryKeyIngestPosition(1L, 101L), null, 11, 11,
false, null));
- jobItemProgress.getTableCheckRangePositions().add(new
TableCheckRangePosition(1, DATA_NODE, TABLE, new
IntegerPrimaryKeyIngestPosition(101L, 200L),
- new IntegerPrimaryKeyIngestPosition(101L, 203L), null, 132,
132, false, null));
+ jobItemProgress.getTableCheckRangePositions().add(new
TableCheckRangePosition(0, DATA_NODE, TABLE, createIntegerPosition(1L, 100L),
+ createIntegerPosition(1L, 101L), null, 11, 11, false, null));
+ jobItemProgress.getTableCheckRangePositions().add(new
TableCheckRangePosition(1, DATA_NODE, TABLE, createIntegerPosition(101L, 200L),
+ createIntegerPosition(101L, 203L), null, 132, 132, false,
null));
ConsistencyCheckJobItemContext actual = new
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "",
"DATA_MATCH", null, databaseType),
0, JobStatus.RUNNING, jobItemProgress);
assertThat(actual.getProgressContext().getTableCheckRangePositions().size(),
is(2));
assertTableCheckRangePosition(actual.getProgressContext().getTableCheckRangePositions().get(0),
- new TableCheckRangePosition(0, DATA_NODE, TABLE, new
IntegerPrimaryKeyIngestPosition(1L, 100L), new
IntegerPrimaryKeyIngestPosition(1L, 101L), null, 11, 11, false, null));
+ new TableCheckRangePosition(0, DATA_NODE, TABLE,
createIntegerPosition(1L, 100L), createIntegerPosition(1L, 101L), null, 11, 11,
false, null));
assertTableCheckRangePosition(actual.getProgressContext().getTableCheckRangePositions().get(1),
- new TableCheckRangePosition(1, DATA_NODE, TABLE, new
IntegerPrimaryKeyIngestPosition(101L, 200L), new
IntegerPrimaryKeyIngestPosition(101L, 203L), null, 132, 132, false, null));
+ new TableCheckRangePosition(1, DATA_NODE, TABLE,
createIntegerPosition(101L, 200L), createIntegerPosition(101L, 203L), null,
132, 132, false, null));
+ }
+
+ private IntegerPrimaryKeyIngestPosition createIntegerPosition(final long
beginValue, final long endValue) {
+ return new
IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(beginValue),
BigInteger.valueOf(endValue));
}
private void assertTableCheckRangePosition(final TableCheckRangePosition
actual, final TableCheckRangePosition expected) {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
index 1ae08bb757d..4d3a1ad3591 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import java.math.BigInteger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@@ -84,10 +85,14 @@ class InventoryIntegerPositionExactCalculatorTest {
void assertGetPositionsWithOrderIdUniqueKey() {
List<IngestPosition> actual =
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null,
"t_order"), "order_id", 3, dataSource, new IntegerPositionHandler());
assertThat(actual.size(), is(4));
- assertIntegerPrimaryKeyIngestPosition0(actual.get(0), new
IntegerPrimaryKeyIngestPosition(1L, 3L));
- assertIntegerPrimaryKeyIngestPosition0(actual.get(1), new
IntegerPrimaryKeyIngestPosition(4L, 6L));
- assertIntegerPrimaryKeyIngestPosition0(actual.get(2), new
IntegerPrimaryKeyIngestPosition(7L, 9L));
- assertIntegerPrimaryKeyIngestPosition0(actual.get(3), new
IntegerPrimaryKeyIngestPosition(10L, 11L));
+ assertIntegerPrimaryKeyIngestPosition0(actual.get(0),
createIntegerPosition(1L, 3L));
+ assertIntegerPrimaryKeyIngestPosition0(actual.get(1),
createIntegerPosition(4L, 6L));
+ assertIntegerPrimaryKeyIngestPosition0(actual.get(2),
createIntegerPosition(7L, 9L));
+ assertIntegerPrimaryKeyIngestPosition0(actual.get(3),
createIntegerPosition(10L, 11L));
+ }
+
+ private IntegerPrimaryKeyIngestPosition createIntegerPosition(final long
beginValue, final long endValue) {
+ return new
IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(beginValue),
BigInteger.valueOf(endValue));
}
private void assertIntegerPrimaryKeyIngestPosition0(final IngestPosition
actual, final IntegerPrimaryKeyIngestPosition expected) {
@@ -102,7 +107,7 @@ class InventoryIntegerPositionExactCalculatorTest {
void assertGetPositionsWithMultiColumnUniqueKeys() {
List<IngestPosition> actual =
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null,
"t_order"), "user_id", 3, dataSource, new IntegerPositionHandler());
assertThat(actual.size(), is(2));
- assertIntegerPrimaryKeyIngestPosition0(actual.get(0), new
IntegerPrimaryKeyIngestPosition(1L, 3L));
- assertIntegerPrimaryKeyIngestPosition0(actual.get(1), new
IntegerPrimaryKeyIngestPosition(4L, 6L));
+ assertIntegerPrimaryKeyIngestPosition0(actual.get(0),
createIntegerPosition(1L, 3L));
+ assertIntegerPrimaryKeyIngestPosition0(actual.get(1),
createIntegerPosition(4L, 6L));
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
index 6df164000d5..051a6f5ad5c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import javax.sql.DataSource;
+import java.math.BigInteger;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@@ -99,8 +100,8 @@ class InventoryTaskSplitterTest {
List<InventoryTask> actual =
inventoryTaskSplitter.split(jobItemContext);
assertThat(actual.size(), is(10));
InventoryTask task = actual.get(9);
- assertThat(((IntegerPrimaryKeyIngestPosition)
task.getTaskProgress().getPosition()).getBeginValue(), is(91L));
- assertThat(((IntegerPrimaryKeyIngestPosition)
task.getTaskProgress().getPosition()).getEndValue(), is(100L));
+ assertThat(((IntegerPrimaryKeyIngestPosition)
task.getTaskProgress().getPosition()).getBeginValue(),
is(BigInteger.valueOf(91L)));
+ assertThat(((IntegerPrimaryKeyIngestPosition)
task.getTaskProgress().getPosition()).getEndValue(),
is(BigInteger.valueOf(100L)));
}
@Test
@@ -110,8 +111,8 @@ class InventoryTaskSplitterTest {
assertThat(actual.size(), is(1));
assertThat(actual.get(0).getTaskId(), is("ds_0.t_order#0"));
IntegerPrimaryKeyIngestPosition keyPosition =
(IntegerPrimaryKeyIngestPosition) actual.get(0).getTaskProgress().getPosition();
- assertThat(keyPosition.getBeginValue(), is(1L));
- assertThat(keyPosition.getEndValue(), is(999L));
+ assertThat(keyPosition.getBeginValue(), is(BigInteger.ONE));
+ assertThat(keyPosition.getEndValue(), is(BigInteger.valueOf(999L)));
}
@Test
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 368f3dc1852..030aec608cc 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.math.BigInteger;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@@ -100,7 +101,7 @@ class InventoryTaskTest {
result.setActualTableName(actualTableName);
result.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData()));
result.getCommonContext().setPosition(null ==
taskConfig.getDumperContext().getCommonContext().getPosition()
- ? new IntegerPrimaryKeyIngestPosition(0L, 1000L)
+ ? new IntegerPrimaryKeyIngestPosition(BigInteger.ONE,
BigInteger.valueOf(1000L))
:
taskConfig.getDumperContext().getCommonContext().getPosition());
return result;
}