This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bc83e6e0d97 Support unique key table check migration (#20944)
bc83e6e0d97 is described below

commit bc83e6e0d97c13abdb449c3b992be7fd8d6abaf6
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Sep 13 20:22:28 2022 +0800

    Support unique key table check migration (#20944)
    
    * Support unique key table check migration
    
    * persist unique key in job configuration
    
    * Fix ci error
    
    * Fix codestyle
    
    * Fix codestyle
---
 .../api/config/job/MigrationJobConfiguration.java  |   3 +
 .../job/yaml/YamlMigrationJobConfiguration.java    |   3 +
 .../yaml/YamlMigrationJobConfigurationSwapper.java |   6 +-
 .../metadata/yaml/YamlPipelineColumnMetaData.java  |  40 +++++++
 .../yaml/YamlPipelineColumnMetaDataSwapper.java    |  51 ++++++++
 .../core/prepare/InventoryTaskSplitter.java        |  66 ++---------
 .../core/util/PipelineTableMetaDataUtil.java       | 129 +++++++++++++++++++++
 .../migration/MigrationDataConsistencyChecker.java |   6 +-
 .../scenario/migration/MigrationJobAPIImpl.java    |   6 +
 .../scenario/migration/MigrationJobPreparer.java   |   8 +-
 .../cases/migration/AbstractMigrationITCase.java   |   4 +-
 .../migration/general/MySQLMigrationGeneralIT.java |   4 +-
 .../general/PostgreSQLMigrationGeneralIT.java      |   4 +-
 .../primarykey/TextPrimaryKeyMigrationIT.java      |  13 ++-
 .../env/scenario/primary_key/unique_key/mysql.xml  |  28 +++++
 .../core/api/impl/MigrationJobAPIImplTest.java     |  27 ++++-
 .../core/prepare/InventoryTaskSplitterTest.java    |  29 ++++-
 17 files changed, 350 insertions(+), 77 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
index 57be24501b7..cb19ff443e6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
@@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 
 import java.util.List;
 
@@ -67,6 +68,8 @@ public final class MigrationJobConfiguration implements 
PipelineJobConfiguration
     
     private final List<String> jobShardingDataNodes;
     
+    private final PipelineColumnMetaData uniqueKeyColumn;
+    
     private final int concurrency;
     
     private final int retryTimes;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
index 917b9f9367e..34d13bc8d97 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
@@ -23,6 +23,7 @@ import lombok.Setter;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaData;
 
 import java.util.List;
 
@@ -66,6 +67,8 @@ public final class YamlMigrationJobConfiguration implements 
YamlPipelineJobConfi
     
     private List<String> jobShardingDataNodes;
     
+    private YamlPipelineColumnMetaData uniqueKeyColumn;
+    
     private int concurrency = 3;
     
     private int retryTimes = 3;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
index 93a1ed0edf8..1fd2e5bf86b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaDataSwapper;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
@@ -32,6 +33,8 @@ public final class YamlMigrationJobConfigurationSwapper 
implements YamlConfigura
     
     private final YamlPipelineDataSourceConfigurationSwapper 
dataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
     
+    private final YamlPipelineColumnMetaDataSwapper 
pipelineColumnMetaDataSwapper = new YamlPipelineColumnMetaDataSwapper();
+    
     @Override
     public YamlMigrationJobConfiguration swapToYamlConfiguration(final 
MigrationJobConfiguration data) {
         YamlMigrationJobConfiguration result = new 
YamlMigrationJobConfiguration();
@@ -47,6 +50,7 @@ public final class YamlMigrationJobConfigurationSwapper 
implements YamlConfigura
         
result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
         result.setTablesFirstDataNodes(data.getTablesFirstDataNodes());
         result.setJobShardingDataNodes(data.getJobShardingDataNodes());
+        
result.setUniqueKeyColumn(pipelineColumnMetaDataSwapper.swapToYamlConfiguration(data.getUniqueKeyColumn()));
         result.setConcurrency(data.getConcurrency());
         result.setRetryTimes(data.getRetryTimes());
         return result;
@@ -59,7 +63,7 @@ public final class YamlMigrationJobConfigurationSwapper 
implements YamlConfigura
                 yamlConfig.getSourceDatabaseType(), 
yamlConfig.getTargetDatabaseType(),
                 yamlConfig.getSourceTableName(), 
yamlConfig.getTargetTableName(),
                 dataSourceConfigSwapper.swapToObject(yamlConfig.getSource()), 
dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),
-                yamlConfig.getTablesFirstDataNodes(), 
yamlConfig.getJobShardingDataNodes(),
+                yamlConfig.getTablesFirstDataNodes(), 
yamlConfig.getJobShardingDataNodes(), 
pipelineColumnMetaDataSwapper.swapToObject(yamlConfig.getUniqueKeyColumn()),
                 yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaData.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaData.java
new file mode 100644
index 00000000000..260e7cf4786
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaData.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.metadata.yaml;
+
+import lombok.Data;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+
+/**
+ * Yaml pipeline column meta data.
+ */
+@Data
+public final class YamlPipelineColumnMetaData implements YamlConfiguration {
+    
+    private int ordinalPosition;
+    
+    private String name;
+    
+    private int dataType;
+    
+    private String dataTypeName;
+    
+    private boolean nullable;
+    
+    private boolean primaryKey;
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaDataSwapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaDataSwapper.java
new file mode 100644
index 00000000000..8d6064c0684
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/yaml/YamlPipelineColumnMetaDataSwapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.metadata.yaml;
+
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * Yaml pipeline column meta data swapper.
+ */
+public final class YamlPipelineColumnMetaDataSwapper implements 
YamlConfigurationSwapper<YamlPipelineColumnMetaData, PipelineColumnMetaData> {
+    
+    @Override
+    public YamlPipelineColumnMetaData swapToYamlConfiguration(final 
PipelineColumnMetaData data) {
+        if (null == data) {
+            return null;
+        }
+        YamlPipelineColumnMetaData result = new YamlPipelineColumnMetaData();
+        result.setName(data.getName());
+        result.setDataType(data.getDataType());
+        result.setDataTypeName(data.getDataTypeName());
+        result.setNullable(data.isNullable());
+        result.setPrimaryKey(data.isPrimaryKey());
+        result.setOrdinalPosition(data.getOrdinalPosition());
+        return result;
+    }
+    
+    @Override
+    public PipelineColumnMetaData swapToObject(final 
YamlPipelineColumnMetaData yamlConfig) {
+        if (null == yamlConfig) {
+            return null;
+        }
+        return new PipelineColumnMetaData(yamlConfig.getOrdinalPosition(), 
yamlConfig.getName(), yamlConfig.getDataType(), yamlConfig.getDataTypeName(), 
yamlConfig.isNullable(),
+                yamlConfig.isPrimaryKey());
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 04856d1f356..7cc8a52f8ec 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.prepare;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -28,15 +27,11 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.StringPrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
@@ -68,7 +63,7 @@ public final class InventoryTaskSplitter {
     
     private final PipelineDataSourceWrapper sourceDataSource;
     
-    private final DumperConfiguration dumperConfig;
+    private final InventoryDumperConfiguration dumperConfig;
     
     private final ImporterConfiguration importerConfig;
     
@@ -97,15 +92,15 @@ public final class InventoryTaskSplitter {
         return result;
     }
     
-    private Collection<InventoryDumperConfiguration> splitDumperConfig(final 
InventoryIncrementalJobItemContext jobItemContext, final DumperConfiguration 
dumperConfig) {
+    private Collection<InventoryDumperConfiguration> splitDumperConfig(final 
InventoryIncrementalJobItemContext jobItemContext, final 
InventoryDumperConfiguration dumperConfig) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) {
-            result.addAll(splitByPrimaryKey(each, jobItemContext, 
sourceDataSource, metaDataLoader));
+            result.addAll(splitByPrimaryKey(each, jobItemContext, 
sourceDataSource));
         }
         return result;
     }
     
-    private Collection<InventoryDumperConfiguration> splitByTable(final 
DumperConfiguration dumperConfig) {
+    private Collection<InventoryDumperConfiguration> splitByTable(final 
InventoryDumperConfiguration dumperConfig) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         dumperConfig.getTableNameMap().forEach((key, value) -> {
             InventoryDumperConfiguration inventoryDumperConfig = new 
InventoryDumperConfiguration(dumperConfig);
@@ -113,19 +108,21 @@ public final class InventoryTaskSplitter {
             inventoryDumperConfig.setActualTableName(key.getOriginal());
             inventoryDumperConfig.setLogicTableName(value.getOriginal());
             inventoryDumperConfig.setPosition(new PlaceholderPosition());
+            inventoryDumperConfig.setUniqueKey(dumperConfig.getUniqueKey());
+            
inventoryDumperConfig.setUniqueKeyDataType(dumperConfig.getUniqueKeyDataType());
             result.add(inventoryDumperConfig);
         });
         return result;
     }
     
     private Collection<InventoryDumperConfiguration> splitByPrimaryKey(final 
InventoryDumperConfiguration dumperConfig, final 
InventoryIncrementalJobItemContext jobItemContext,
-                                                                       final 
DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
+                                                                       final 
DataSource dataSource) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         InventoryIncrementalProcessContext jobProcessContext = 
jobItemContext.getJobProcessContext();
         PipelineReadConfiguration readConfig = 
jobProcessContext.getPipelineProcessConfig().getRead();
         int batchSize = readConfig.getBatchSize();
         JobRateLimitAlgorithm rateLimitAlgorithm = 
jobProcessContext.getReadRateLimitAlgorithm();
-        Collection<IngestPosition<?>> inventoryPositions = 
getInventoryPositions(jobItemContext, dumperConfig, dataSource, metaDataLoader);
+        Collection<IngestPosition<?>> inventoryPositions = 
getInventoryPositions(jobItemContext, dumperConfig, dataSource);
         int i = 0;
         for (IngestPosition<?> inventoryPosition : inventoryPositions) {
             InventoryDumperConfiguration splitDumperConfig = new 
InventoryDumperConfiguration(dumperConfig);
@@ -143,58 +140,19 @@ public final class InventoryTaskSplitter {
     }
     
     private Collection<IngestPosition<?>> getInventoryPositions(final 
InventoryIncrementalJobItemContext jobItemContext, final 
InventoryDumperConfiguration dumperConfig,
-                                                                final 
DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
-        String schemaName = dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName()));
-        String actualTableName = dumperConfig.getActualTableName();
-        PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(schemaName, actualTableName);
-        PipelineColumnMetaData uniqueKeyColumn = 
mustGetAnAppropriateUniqueKeyColumn(tableMetaData, actualTableName);
+                                                                final 
DataSource dataSource) {
         if (null != initProgress && initProgress.getStatus() != 
JobStatus.PREPARING_FAILURE) {
-            Collection<IngestPosition<?>> result = 
initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
-            for (IngestPosition<?> each : result) {
-                if (each instanceof PrimaryKeyPosition) {
-                    dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
-                    
dumperConfig.setUniqueKeyDataType(uniqueKeyColumn.getDataType());
-                    break;
-                }
-            }
             // Do NOT filter FinishedPosition here, since whole inventory 
tasks are required in job progress when persisting to register center.
-            return result;
+            return 
initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
         }
-        dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
-        int uniqueKeyDataType = uniqueKeyColumn.getDataType();
-        dumperConfig.setUniqueKeyDataType(uniqueKeyDataType);
+        int uniqueKeyDataType = dumperConfig.getUniqueKeyDataType();
         if (PipelineJdbcUtils.isIntegerColumn(uniqueKeyDataType)) {
             return getPositionByIntegerPrimaryKeyRange(jobItemContext, 
dataSource, dumperConfig);
         } else if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
             return getPositionByStringPrimaryKeyRange();
         } else {
-            throw new PipelineJobCreationException(String.format("Can not 
split range for table %s, reason: primary key is not integer or string type", 
actualTableName));
-        }
-    }
-    
-    private PipelineColumnMetaData mustGetAnAppropriateUniqueKeyColumn(final 
PipelineTableMetaData tableMetaData, final String tableName) {
-        if (null == tableMetaData) {
-            throw new PipelineJobCreationException(String.format("Can not 
split range for table %s, reason: can not get table metadata ", tableName));
-        }
-        List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
-        if (primaryKeys.size() > 1) {
-            throw new PipelineJobCreationException(String.format("Can not 
split range for table %s, reason: primary key is union primary", tableName));
-        }
-        if (1 == primaryKeys.size()) {
-            return 
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
-        }
-        Collection<PipelineIndexMetaData> uniqueIndexes = 
tableMetaData.getUniqueIndexes();
-        if (uniqueIndexes.isEmpty()) {
-            throw new PipelineJobCreationException(String.format("Can not 
split range for table %s, reason: no primary key or unique index", tableName));
-        }
-        if (1 == uniqueIndexes.size() && 1 == 
uniqueIndexes.iterator().next().getColumns().size()) {
-            PipelineColumnMetaData column = 
uniqueIndexes.iterator().next().getColumns().get(0);
-            if (!column.isNullable()) {
-                return column;
-            }
+            throw new PipelineJobCreationException(String.format("Can not 
split range for table %s, reason: primary key is not integer or string type", 
dumperConfig.getActualTableName()));
         }
-        throw new PipelineJobCreationException(
-                String.format("Can not split range for table %s, reason: table 
contains multiple unique index or unique index contains nullable/multiple 
column(s)", tableName));
     }
     
     private Collection<IngestPosition<?>> 
getPositionByIntegerPrimaryKeyRange(final InventoryIncrementalJobItemContext 
jobItemContext, final DataSource dataSource,
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
new file mode 100644
index 00000000000..f6d04b192e5
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.SneakyThrows;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Pipeline table meta data util.
+ */
+public final class PipelineTableMetaDataUtil {
+    
+    /**
+     * Get pipeline table meta data.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param dataSourceConfig source configuration
+     * @param loader pipeline table meta data loader* @return pipeline table 
meta data
+     * @return pipeline table meta data
+     */
+    @SneakyThrows(SQLException.class)
+    public static PipelineTableMetaData getPipelineTableMetaData(final String 
schemaName, final String tableName, final 
StandardPipelineDataSourceConfiguration dataSourceConfig,
+                                                                 final 
PipelineTableMetaDataLoader loader) {
+        try (PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(dataSourceConfig)) {
+            return getPipelineTableMetaData(schemaName, tableName, dataSource, 
loader);
+        }
+    }
+    
+    /**
+     * Get pipeline table meta data.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param dataSource data source
+     * @param loader pipeline table meta data loader
+     * @return pipeline table meta data.
+     */
+    public static PipelineTableMetaData getPipelineTableMetaData(final String 
schemaName, final String tableName, final PipelineDataSourceWrapper dataSource,
+                                                                 final 
PipelineTableMetaDataLoader loader) {
+        if (null == loader) {
+            return new 
StandardPipelineTableMetaDataLoader(dataSource).getTableMetaData(schemaName, 
tableName);
+        } else {
+            return loader.getTableMetaData(schemaName, tableName);
+        }
+    }
+    
+    /**
+     * Get unique key column, if primary key exists, return primary key, 
otherwise return the first unique key.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param dataSourceConfig data source config
+     * @param loader pipeline table meta data loader
+     * @return pipeline column meta data.
+     */
+    public static PipelineColumnMetaData getUniqueKeyColumn(final String 
schemaName, final String tableName, final 
StandardPipelineDataSourceConfiguration dataSourceConfig,
+                                                            final 
StandardPipelineTableMetaDataLoader loader) {
+        PipelineTableMetaData pipelineTableMetaData = 
getPipelineTableMetaData(schemaName, tableName, dataSourceConfig, loader);
+        return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData, 
tableName);
+    }
+    
+    /**
+     * Get unique key column, if primary key exists, return primary key, 
otherwise return the first unique key.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param dataSource data source
+     * @param loader pipeline table meta data loader
+     * @return pipeline column meta data.
+     */
+    public static PipelineColumnMetaData getUniqueKeyColumn(final String 
schemaName, final String tableName, final PipelineDataSourceWrapper dataSource,
+                                                            final 
StandardPipelineTableMetaDataLoader loader) {
+        PipelineTableMetaData pipelineTableMetaData = 
getPipelineTableMetaData(schemaName, tableName, dataSource, loader);
+        return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData, 
tableName);
+    }
+    
+    private static PipelineColumnMetaData 
mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData, 
final String tableName) {
+        if (null == tableMetaData) {
+            throw new PipelineJobCreationException(String.format("Can not 
split range for table %s, reason: can not get table metadata ", tableName));
+        }
+        List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
+        if (primaryKeys.size() > 1) {
+            throw new PipelineJobCreationException(String.format("Can not 
split range for table %s, reason: primary key is union primary", tableName));
+        }
+        if (1 == primaryKeys.size()) {
+            return 
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+        }
+        Collection<PipelineIndexMetaData> uniqueIndexes = 
tableMetaData.getUniqueIndexes();
+        if (uniqueIndexes.isEmpty()) {
+            throw new PipelineJobCreationException(String.format("Can not 
split range for table %s, reason: no primary key or unique index", tableName));
+        }
+        if (1 == uniqueIndexes.size() && 1 == 
uniqueIndexes.iterator().next().getColumns().size()) {
+            PipelineColumnMetaData column = 
uniqueIndexes.iterator().next().getColumns().get(0);
+            if (!column.isNullable()) {
+                return column;
+            }
+        }
+        throw new PipelineJobCreationException(
+                String.format("Can not split range for table %s, reason: table 
contains multiple unique index or unique index contains nullable/multiple 
column(s)", tableName));
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index bbd61d295e6..c6061c1eaf4 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -32,8 +32,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineDataConsistencyCheckFailedException;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -174,12 +174,12 @@ public final class MigrationDataConsistencyChecker {
             String sourceDatabaseType = 
sourceDataSourceConfig.getDatabaseType().getType();
             String targetDatabaseType = 
targetDataSourceConfig.getDatabaseType().getType();
             for (String each : Collections.singletonList(sourceTableName)) {
-                PipelineTableMetaData tableMetaData = new 
StandardPipelineTableMetaDataLoader(sourceDataSource).getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each),
 each);
+                PipelineTableMetaData tableMetaData = 
PipelineTableMetaDataUtil.getPipelineTableMetaData(tableNameSchemaNameMapping.getSchemaName(each),
 each, sourceDataSource, null);
                 if (null == tableMetaData) {
                     throw new PipelineDataConsistencyCheckFailedException("Can 
not get metadata for table " + each);
                 }
                 Collection<String> columnNames = 
tableMetaData.getColumnNames();
-                PipelineColumnMetaData uniqueKey = 
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+                PipelineColumnMetaData uniqueKey = 
jobConfig.getUniqueKeyColumn();
                 DataConsistencyCalculateParameter sourceParameter = 
buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames, 
sourceDatabaseType, targetDatabaseType, uniqueKey);
                 DataConsistencyCalculateParameter targetParameter = 
buildParameter(targetDataSource, tableNameSchemaNameMapping, targetTableName, 
columnNames, targetDatabaseType, sourceDatabaseType,
                         uniqueKey);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 2fa0d392489..e68b75c7a1f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -51,6 +51,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaDataSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
@@ -67,6 +69,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.AddMigr
 import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.DropMigrationSourceResourceException;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineSchemaTableUtil;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
@@ -465,6 +468,9 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         
result.setTargetDatabaseType(targetPipelineDataSource.getDatabaseType().getType());
         result.setTargetDatabaseName(targetDatabaseName);
         result.setTargetTableName(parameter.getTargetTableName());
+        YamlPipelineColumnMetaData uniqueKeyColumn = new 
YamlPipelineColumnMetaDataSwapper().swapToYamlConfiguration(PipelineTableMetaDataUtil.getUniqueKeyColumn(sourceSchemaName,
+                parameter.getSourceTableName(), sourceDataSourceConfig, null));
+        result.setUniqueKeyColumn(uniqueKeyColumn);
         extendYamlJobConfiguration(result);
         MigrationJobConfiguration jobConfiguration = new 
YamlMigrationJobConfigurationSwapper().swapToObject(result);
         start(jobConfiguration);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index e17a19ff197..99693e5e20a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -26,6 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineIgnoredException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
@@ -143,8 +145,12 @@ public final class MigrationJobPreparer {
     }
     
     private void initInventoryTasks(final MigrationJobItemContext 
jobItemContext) {
+        InventoryDumperConfiguration inventoryDumperConfig = new 
InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
+        PipelineColumnMetaData uniqueKeyColumn = 
jobItemContext.getJobConfig().getUniqueKeyColumn();
+        inventoryDumperConfig.setUniqueKey(uniqueKeyColumn.getName());
+        
inventoryDumperConfig.setUniqueKeyDataType(uniqueKeyColumn.getDataType());
         InventoryTaskSplitter inventoryTaskSplitter = new 
InventoryTaskSplitter(
-                jobItemContext.getSourceDataSource(), 
jobItemContext.getTaskConfig().getDumperConfig(), 
jobItemContext.getTaskConfig().getImporterConfig(), 
jobItemContext.getInitProgress(),
+                jobItemContext.getSourceDataSource(), inventoryDumperConfig, 
jobItemContext.getTaskConfig().getImporterConfig(), 
jobItemContext.getInitProgress(),
                 jobItemContext.getSourceMetaDataLoader(), 
jobItemContext.getDataSourceManager(), 
jobItemContext.getJobProcessContext().getImporterExecuteEngine());
         
jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
     }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 05d525474fe..fcdd0f9fd72 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -159,7 +159,7 @@ public abstract class AbstractMigrationITCase extends 
BaseITCase {
         return jobList.stream().filter(a -> 
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new 
RuntimeException("not find " + tableName + " table")).get("id").toString();
     }
     
-    protected void assertCheckMigrationSuccess(final String jobId) {
+    protected void assertCheckMigrationSuccess(final String jobId, final 
String algorithmType) {
         for (int i = 0; i < 5; i++) {
             if (checkJobIncrementTaskFinished(jobId)) {
                 break;
@@ -168,7 +168,7 @@ public abstract class AbstractMigrationITCase extends 
BaseITCase {
         }
         boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
         log.info("second check job result: {}", secondCheckJobResult);
-        List<Map<String, Object>> checkJobResults = 
queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE 
(NAME='DATA_MATCH')", jobId));
+        List<Map<String, Object>> checkJobResults = 
queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", 
jobId, algorithmType));
         log.info("check job results: {}", checkJobResults);
         for (Map<String, Object> entry : checkJobResults) {
             
assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 3cfcd013495..7c3889a7827 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -42,7 +42,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
 /**
- * General scaling test case, includes multiple cases.
+ * General migration test case, includes multiple cases.
  */
 @Slf4j
 @RunWith(Parameterized.class)
@@ -103,7 +103,7 @@ public final class MySQLMigrationGeneralIT extends 
AbstractMigrationITCase {
     
     private void assertMigrationSuccessById(final String jobId) throws 
SQLException, InterruptedException {
         waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-        assertCheckMigrationSuccess(jobId);
+        assertCheckMigrationSuccess(jobId, "DATA_MATCH");
         stopMigrationByJobId(jobId);
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index 81b5bed2ba9..e181a40838c 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -111,7 +111,7 @@ public final class PostgreSQLMigrationGeneralIT extends 
AbstractMigrationITCase
         sourceExecuteWithLog(String.format("INSERT INTO %s.t_order_copy 
(order_id,user_id,status) VALUES (%s, %s, '%s')", SCHEMA_NAME, 
KEY_GENERATE_ALGORITHM.generateKey(),
                 1, "afterStop"));
         startMigrationByJobId(jobId);
-        assertCheckMigrationSuccess(jobId);
+        assertCheckMigrationSuccess(jobId, "DATA_MATCH");
         stopMigrationByJobId(jobId);
     }
     
@@ -119,7 +119,7 @@ public final class PostgreSQLMigrationGeneralIT extends 
AbstractMigrationITCase
         startMigrationOrderItem(true);
         String jobId = getJobIdByTableName("t_order_item");
         waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-        assertCheckMigrationSuccess(jobId);
+        assertCheckMigrationSuccess(jobId, "DATA_MATCH");
         stopMigrationByJobId(jobId);
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
index d9d681514d2..d180deae6a8 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.integration.data.pipeline.cases.migration.Abstr
 import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
 import 
org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
+import 
org.apache.shardingsphere.test.integration.env.container.atomic.util.DatabaseTypeUtil;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -38,7 +39,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
 @RunWith(Parameterized.class)
@@ -58,6 +59,7 @@ public class TextPrimaryKeyMigrationIT extends 
AbstractMigrationITCase {
         }
         for (String version : ENV.listDatabaseDockerImageNames(new 
MySQLDatabaseType())) {
             result.add(new ScalingParameterized(new MySQLDatabaseType(), 
version, "env/scenario/primary_key/text_primary_key/mysql.xml"));
+            result.add(new ScalingParameterized(new MySQLDatabaseType(), 
version, "env/scenario/primary_key/unique_key/mysql.xml"));
         }
         for (String version : ENV.listDatabaseDockerImageNames(new 
PostgreSQLDatabaseType())) {
             result.add(new ScalingParameterized(new PostgreSQLDatabaseType(), 
version, "env/scenario/primary_key/text_primary_key/postgresql.xml"));
@@ -79,8 +81,14 @@ public class TextPrimaryKeyMigrationIT extends 
AbstractMigrationITCase {
         startMigrationOrder();
         String jobId = listJobId().get(0);
         waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        sourceExecuteWithLog(String.format("INSERT INTO t_order 
(order_id,user_id,status) VALUES (%s, %s, '%s')", "1000000000", 1, 
"afterStop"));
+        // TODO The ordering of primary or unique keys for text types is 
different, may cause check failed, need fix
+        if (DatabaseTypeUtil.isMySQL(getDatabaseType())) {
+            assertCheckMigrationSuccess(jobId, "CRC32_MATCH");
+        } else {
+            assertCheckMigrationSuccess(jobId, "DATA_MATCH");
+        }
         stopMigrationByJobId(jobId);
-        assertCheckMigrationSuccess(jobId);
         if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
             commitMigrationByJobId(jobId);
             List<String> lastJobIds = listJobId();
@@ -100,5 +108,6 @@ public class TextPrimaryKeyMigrationIT extends 
AbstractMigrationITCase {
             }
             preparedStatement.executeBatch();
         }
+        log.info("init data succeed");
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
new file mode 100644
index 00000000000..5f8a288d79a
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
@@ -0,0 +1,28 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<command>
+    <create-table-order>
+        CREATE TABLE `t_order` (
+        `order_id` varchar(255) NOT NULL,
+        `user_id` INT NOT NULL,
+        `status` varchar(255) NULL,
+        `t_unsigned_int` int UNSIGNED NULL,
+        CONSTRAINT unique_id UNIQUE (order_id),
+        INDEX ( `user_id` )
+        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+    </create-table-order>
+</command>
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 1b17ff77540..edd701f1189 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -35,9 +36,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import 
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.junit.AfterClass;
@@ -77,8 +80,7 @@ public final class MigrationJobAPIImplTest {
         PipelineContextUtil.mockModeConfigAndContextManager();
         jobAPI = MigrationJobAPIFactory.getInstance();
         Map<String, Object> props = new HashMap<>();
-        // TODO if resource availability is checked, then it should not work
-        props.put("jdbcUrl", "jdbc:mysql://localhost:3306/test");
+        props.put("jdbcUrl", 
"jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL");
         props.put("username", "root");
         props.put("password", "root");
         Map<String, DataSourceProperties> expect = new LinkedHashMap<>(1, 1);
@@ -151,8 +153,10 @@ public final class MigrationJobAPIImplTest {
     }
     
     @Test
-    public void assertDataConsistencyCheck() {
-        Optional<String> jobId = 
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+    public void assertDataConsistencyCheck() throws NoSuchFieldException, 
IllegalAccessException {
+        MigrationJobConfiguration jobConfiguration = 
JobConfigurationBuilder.createJobConfiguration();
+        ReflectionUtil.setFieldValue(jobConfiguration, "uniqueKeyColumn", new 
PipelineColumnMetaData(1, "order_id", 4, "", false, true));
+        Optional<String> jobId = jobAPI.start(jobConfiguration);
         assertTrue(jobId.isPresent());
         MigrationJobConfiguration jobConfig = 
jobAPI.getJobConfiguration(jobId.get());
         if (null == jobConfig.getSource()) {
@@ -266,7 +270,8 @@ public final class MigrationJobAPIImplTest {
     }
     
     @Test
-    public void assertCreateJobConfig() {
+    public void assertCreateJobConfig() throws SQLException {
+        initIntPrimaryEnvironment();
         CreateMigrationJobParameter parameter = new 
CreateMigrationJobParameter("ds_0", null, "t_order", "logic_db", "t_order");
         String jobId = jobAPI.createJobAndStart(parameter);
         MigrationJobConfiguration jobConfig = 
jobAPI.getJobConfiguration(jobId);
@@ -276,6 +281,18 @@ public final class MigrationJobAPIImplTest {
         assertThat(jobConfig.getTargetTableName(), is("t_order"));
     }
     
+    private void initIntPrimaryEnvironment() throws SQLException {
+        Map<String, DataSourceProperties> metaDataDataSource = new 
PipelineDataSourcePersistService().load(JobType.MIGRATION);
+        DataSourceProperties dataSourceProperties = 
metaDataDataSource.get("ds_0");
+        DataSource dataSource = 
DataSourcePoolCreator.create(dataSourceProperties);
+        try (
+                Connection connection = dataSource.getConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("DROP TABLE IF EXISTS t_order");
+            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, 
user_id int(10))");
+        }
+    }
+    
     @Test
     public void assertShowMigrationSourceResources() {
         Collection<Collection<Object>> actual = 
jobAPI.listMigrationSourceResources();
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 28a1b00a4af..06d4a52e159 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -18,13 +18,18 @@
 package org.apache.shardingsphere.data.pipeline.core.prepare;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
 import org.junit.After;
@@ -36,9 +41,11 @@ import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Types;
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
 public final class InventoryTaskSplitterTest {
@@ -57,15 +64,19 @@ public final class InventoryTaskSplitterTest {
     }
     
     @Before
-    public void setUp() {
+    public void setUp() throws NoSuchFieldException, IllegalAccessException {
         initJobItemContext();
+        InventoryDumperConfiguration dumperConfig = new 
InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
+        dumperConfig.setUniqueKeyDataType(Types.INTEGER);
+        dumperConfig.setUniqueKey("order_id");
         inventoryTaskSplitter = new InventoryTaskSplitter(
-                jobItemContext.getSourceDataSource(), 
jobItemContext.getTaskConfig().getDumperConfig(), 
jobItemContext.getTaskConfig().getImporterConfig(), 
jobItemContext.getInitProgress(),
+                jobItemContext.getSourceDataSource(), dumperConfig, 
jobItemContext.getTaskConfig().getImporterConfig(), 
jobItemContext.getInitProgress(),
                 jobItemContext.getSourceMetaDataLoader(), 
jobItemContext.getDataSourceManager(), 
jobItemContext.getJobProcessContext().getImporterExecuteEngine());
     }
     
-    private void initJobItemContext() {
+    private void initJobItemContext() throws NoSuchFieldException, 
IllegalAccessException {
         MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
+        ReflectionUtil.setFieldValue(jobConfig, "uniqueKeyColumn", new 
PipelineColumnMetaData(1, "order_id", 4, "", false, true));
         jobItemContext = 
PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
         dataSourceManager = jobItemContext.getDataSourceManager();
         taskConfig = jobItemContext.getTaskConfig();
@@ -110,14 +121,22 @@ public final class InventoryTaskSplitterTest {
     }
     
     @Test(expected = PipelineJobCreationException.class)
-    public void assertSplitInventoryDataWithUnionPrimary() throws SQLException 
{
+    public void assertSplitInventoryDataWithIllegalKeyDataType() throws 
SQLException, NoSuchFieldException, IllegalAccessException {
         initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
+        InventoryDumperConfiguration dumperConfig = 
ReflectionUtil.getFieldValue(inventoryTaskSplitter, "dumperConfig", 
InventoryDumperConfiguration.class);
+        assertNotNull(dumperConfig);
+        dumperConfig.setUniqueKey("order_id,user_id");
+        dumperConfig.setUniqueKeyDataType(Integer.MIN_VALUE);
         inventoryTaskSplitter.splitInventoryData(jobItemContext);
     }
     
     @Test(expected = PipelineJobCreationException.class)
-    public void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws 
SQLException {
+    public void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws 
SQLException, NoSuchFieldException, IllegalAccessException {
         initNoPrimaryEnvironment(taskConfig.getDumperConfig());
+        try (PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()))
 {
+            PipelineColumnMetaData uniqueKeyColumn = 
PipelineTableMetaDataUtil.getUniqueKeyColumn(null, "t_order", dataSource, null);
+            ReflectionUtil.setFieldValue(jobItemContext.getJobConfig(), 
"uniqueKeyColumn", uniqueKeyColumn);
+        }
         inventoryTaskSplitter.splitInventoryData(jobItemContext);
     }
     

Reply via email to