This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 575d72ca783 Fix multi unique key migration ignore inventory task when
the first type is number (#24263)
575d72ca783 is described below
commit 575d72ca78386882e41b487e997b0f8f4d0c7a33
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Feb 20 21:32:48 2023 +0800
Fix multi unique key migration ignore inventory task when the first type is
number (#24263)
* Fix multi unique key migration ignore inventory task when the first is
number type.
* Fix ci
* Merge method
---
.../ingest/position/PrimaryKeyPositionFactory.java | 2 +-
.../core/ingest/dumper/InventoryDumper.java | 9 +++-----
.../primarykey/IndexesMigrationE2EIT.java | 25 ++++++++++++++--------
3 files changed, 20 insertions(+), 16 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
index cbbf3cd6faf..6e8bbabf9bb 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
@@ -63,7 +63,7 @@ public final class PrimaryKeyPositionFactory {
*/
public static IngestPosition<?> newInstance(final Object beginValue, final
Object endValue) {
if (beginValue instanceof Number) {
- return new IntegerPrimaryKeyPosition(((Number)
beginValue).longValue(), ((Number) endValue).longValue());
+ return new IntegerPrimaryKeyPosition(((Number)
beginValue).longValue(), null != endValue ? ((Number) endValue).longValue() :
Long.MAX_VALUE);
}
if (beginValue instanceof CharSequence) {
return new StringPrimaryKeyPosition(beginValue.toString(), null !=
endValue ? endValue.toString() : null);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 08f43c64f5b..395dc244917 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -140,12 +140,9 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
if (!dumperConfig.hasUniqueKey()) {
return sqlBuilder.buildNoUniqueKeyInventoryDumpSQL(schemaName,
dumperConfig.getActualTableName());
}
+ PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>)
dumperConfig.getPosition();
PipelineColumnMetaData firstColumn =
dumperConfig.getUniqueKeyColumns().get(0);
- if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType())) {
- return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName,
dumperConfig.getActualTableName(), firstColumn.getName());
- }
- if (PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
- PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>)
dumperConfig.getPosition();
+ if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) ||
PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
if (null != position.getBeginValue() && null !=
position.getEndValue()) {
return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName,
dumperConfig.getActualTableName(), firstColumn.getName());
}
@@ -162,7 +159,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
}
PipelineColumnMetaData firstColumn =
dumperConfig.getUniqueKeyColumns().get(0);
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>)
dumperConfig.getPosition();
- if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType())) {
+ if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) &&
null != position.getBeginValue() && null != position.getEndValue()) {
preparedStatement.setObject(1, position.getBeginValue());
preparedStatement.setObject(2, position.getEndValue());
return;
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index a1324bfb6a7..9750328847a 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primary
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import
org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
@@ -33,6 +34,7 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
@@ -84,7 +86,7 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
} else {
return;
}
- assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+ assertMigrationSuccess(sql, new UUIDKeyGenerateAlgorithm(),
consistencyCheckAlgorithmType);
}
@Test
@@ -97,7 +99,7 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
} else {
return;
}
- assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+ assertMigrationSuccess(sql, new UUIDKeyGenerateAlgorithm(),
consistencyCheckAlgorithmType);
}
@Test
@@ -105,12 +107,12 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
String sql;
String consistencyCheckAlgorithmType;
if (getDatabaseType() instanceof MySQLDatabaseType) {
- sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL,
`user_id` INT NOT NULL, `status` varchar(255), UNIQUE KEY
(`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+ sql = "CREATE TABLE `%s` (`order_id` BIGINT NOT NULL, `user_id`
INT NOT NULL, `status` varchar(255), UNIQUE KEY (`order_id`,`user_id`))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
consistencyCheckAlgorithmType = "DATA_MATCH";
} else {
return;
}
- assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+ assertMigrationSuccess(sql, new SnowflakeKeyGenerateAlgorithm(),
consistencyCheckAlgorithmType);
}
@Test
@@ -124,14 +126,13 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
} else {
return;
}
- assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+ assertMigrationSuccess(sql, new UUIDKeyGenerateAlgorithm(),
consistencyCheckAlgorithmType);
}
- private void assertMigrationSuccess(final String sqlPattern, final String
consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
+ private void assertMigrationSuccess(final String sqlPattern, final
KeyGenerateAlgorithm generateAlgorithm, final String
consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
initEnvironment(getDatabaseType(), new MigrationJobType());
sourceExecuteWithLog(String.format(sqlPattern,
getSourceTableOrderName()));
try (Connection connection = getSourceDataSource().getConnection()) {
- KeyGenerateAlgorithm generateAlgorithm = new
UUIDKeyGenerateAlgorithm();
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
generateAlgorithm, getSourceTableOrderName(),
PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
}
addMigrationProcessConfig();
@@ -141,8 +142,14 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
startMigration(getSourceTableOrderName(), getTargetTableOrderName());
String jobId = listJobId().get(0);
waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
- sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status)
VALUES ('a1', 1, 'OK')");
- assertProxyOrderRecordExist("t_order", "a1");
+ Comparable<?> primaryKey = generateAlgorithm.generateKey();
+ try (PreparedStatement preparedStatement =
getSourceDataSource().getConnection().prepareStatement("INSERT INTO t_order
(order_id,user_id,status) VALUES (?,?,?)")) {
+ preparedStatement.setObject(1, primaryKey);
+ preparedStatement.setObject(2, 1);
+ preparedStatement.setObject(3, "OK");
+ preparedStatement.execute();
+ }
+ assertProxyOrderRecordExist("t_order", primaryKey);
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
commitMigrationByJobId(jobId);