This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 575d72ca783 Fix multi unique key migration ignore inventory task when 
the first type is number (#24263)
575d72ca783 is described below

commit 575d72ca78386882e41b487e997b0f8f4d0c7a33
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Feb 20 21:32:48 2023 +0800

    Fix multi unique key migration ignore inventory task when the first type is 
number (#24263)
    
    * Fix multi unique key migration ignore inventory task when the first is 
number type.
    
    * Fix ci
    
    * Merge method
---
 .../ingest/position/PrimaryKeyPositionFactory.java |  2 +-
 .../core/ingest/dumper/InventoryDumper.java        |  9 +++-----
 .../primarykey/IndexesMigrationE2EIT.java          | 25 ++++++++++++++--------
 3 files changed, 20 insertions(+), 16 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
index cbbf3cd6faf..6e8bbabf9bb 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
@@ -63,7 +63,7 @@ public final class PrimaryKeyPositionFactory {
      */
     public static IngestPosition<?> newInstance(final Object beginValue, final 
Object endValue) {
         if (beginValue instanceof Number) {
-            return new IntegerPrimaryKeyPosition(((Number) 
beginValue).longValue(), ((Number) endValue).longValue());
+            return new IntegerPrimaryKeyPosition(((Number) 
beginValue).longValue(), null != endValue ? ((Number) endValue).longValue() : 
Long.MAX_VALUE);
         }
         if (beginValue instanceof CharSequence) {
             return new StringPrimaryKeyPosition(beginValue.toString(), null != 
endValue ? endValue.toString() : null);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 08f43c64f5b..395dc244917 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -140,12 +140,9 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         if (!dumperConfig.hasUniqueKey()) {
             return sqlBuilder.buildNoUniqueKeyInventoryDumpSQL(schemaName, 
dumperConfig.getActualTableName());
         }
+        PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) 
dumperConfig.getPosition();
         PipelineColumnMetaData firstColumn = 
dumperConfig.getUniqueKeyColumns().get(0);
-        if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType())) {
-            return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, 
dumperConfig.getActualTableName(), firstColumn.getName());
-        }
-        if (PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
-            PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) 
dumperConfig.getPosition();
+        if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || 
PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
             if (null != position.getBeginValue() && null != 
position.getEndValue()) {
                 return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, 
dumperConfig.getActualTableName(), firstColumn.getName());
             }
@@ -162,7 +159,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         }
         PipelineColumnMetaData firstColumn = 
dumperConfig.getUniqueKeyColumns().get(0);
         PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) 
dumperConfig.getPosition();
-        if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType())) {
+        if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && 
null != position.getBeginValue() && null != position.getEndValue()) {
             preparedStatement.setObject(1, position.getBeginValue());
             preparedStatement.setObject(2, position.getEndValue());
             return;
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index a1324bfb6a7..9750328847a 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primary
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import 
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
@@ -33,6 +34,7 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -84,7 +86,7 @@ public final class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         } else {
             return;
         }
-        assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+        assertMigrationSuccess(sql, new UUIDKeyGenerateAlgorithm(), 
consistencyCheckAlgorithmType);
     }
     
     @Test
@@ -97,7 +99,7 @@ public final class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         } else {
             return;
         }
-        assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+        assertMigrationSuccess(sql, new UUIDKeyGenerateAlgorithm(), 
consistencyCheckAlgorithmType);
     }
     
     @Test
@@ -105,12 +107,12 @@ public final class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         String sql;
         String consistencyCheckAlgorithmType;
         if (getDatabaseType() instanceof MySQLDatabaseType) {
-            sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, 
`user_id` INT NOT NULL, `status` varchar(255), UNIQUE KEY 
(`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+            sql = "CREATE TABLE `%s` (`order_id` BIGINT NOT NULL, `user_id` 
INT NOT NULL, `status` varchar(255), UNIQUE KEY (`order_id`,`user_id`)) 
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
             consistencyCheckAlgorithmType = "DATA_MATCH";
         } else {
             return;
         }
-        assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+        assertMigrationSuccess(sql, new SnowflakeKeyGenerateAlgorithm(), 
consistencyCheckAlgorithmType);
     }
     
     @Test
@@ -124,14 +126,13 @@ public final class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         } else {
             return;
         }
-        assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+        assertMigrationSuccess(sql, new UUIDKeyGenerateAlgorithm(), 
consistencyCheckAlgorithmType);
     }
     
-    private void assertMigrationSuccess(final String sqlPattern, final String 
consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
+    private void assertMigrationSuccess(final String sqlPattern, final 
KeyGenerateAlgorithm generateAlgorithm, final String 
consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
         initEnvironment(getDatabaseType(), new MigrationJobType());
         sourceExecuteWithLog(String.format(sqlPattern, 
getSourceTableOrderName()));
         try (Connection connection = getSourceDataSource().getConnection()) {
-            KeyGenerateAlgorithm generateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
             
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
generateAlgorithm, getSourceTableOrderName(), 
PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
         }
         addMigrationProcessConfig();
@@ -141,8 +142,14 @@ public final class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         startMigration(getSourceTableOrderName(), getTargetTableOrderName());
         String jobId = listJobId().get(0);
         waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", 
jobId));
-        sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) 
VALUES ('a1', 1, 'OK')");
-        assertProxyOrderRecordExist("t_order", "a1");
+        Comparable<?> primaryKey = generateAlgorithm.generateKey();
+        try (PreparedStatement preparedStatement = 
getSourceDataSource().getConnection().prepareStatement("INSERT INTO t_order 
(order_id,user_id,status) VALUES (?,?,?)")) {
+            preparedStatement.setObject(1, primaryKey);
+            preparedStatement.setObject(2, 1);
+            preparedStatement.setObject(3, "OK");
+            preparedStatement.execute();
+        }
+        assertProxyOrderRecordExist("t_order", primaryKey);
         waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", 
jobId));
         assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
         commitMigrationByJobId(jobId);

Reply via email to