This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 049dee5653e Remove unused algorithm config, add 
dataConsistencyCalculateAlgorithm chooser (#20527)
049dee5653e is described below

commit 049dee5653e848c79d50c6bb65bcf8656b0c12de
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 26 09:20:39 2022 +0800

    Remove unused algorithm config, add dataConsistencyCalculateAlgorithm 
chooser (#20527)
    
    * Remove unused completionDetectAlgorithm
    
    * Remove unused dataConsistencyCalculateAlgorithm, add algorithm chooser
    
    * Remove unused isDataConsistencyCheckNeeded
    
    * Move getType
    
    * Move and rename DataConsistencyChecker
---
 .../data/pipeline/api/MigrationJobPublicAPI.java   |  8 ---
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |  5 ++
 .../DataConsistencyCalculateAlgorithmChooser.java  | 50 +++++++++++++++++++
 .../consistency}/DataConsistencyCheckUtils.java    |  2 +-
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  2 +-
 .../MigrationDataConsistencyChecker.java}          | 30 ++----------
 .../scenario/migration/MigrationJobAPI.java        |  8 ---
 .../scenario/migration/MigrationJobAPIImpl.java    | 50 +++----------------
 .../migration/MigrationProcessContext.java         | 10 ----
 .../scaling/core/job/dumper/DumperFactory.java     | 29 -----------
 ...taConsistencyCalculateAlgorithmChooserTest.java | 57 ++++++++++++++++++++++
 .../DataConsistencyCheckUtilsTest.java             |  2 +-
 .../core/fixture/MigrationJobAPIFixture.java       | 10 ----
 .../core/api/impl/MigrationJobAPIImplTest.java     |  7 ---
 .../MigrationDataConsistencyCheckerTest.java}      |  7 ++-
 15 files changed, 128 insertions(+), 149 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index c77ea3e77ef..e2506a4cc66 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -61,14 +61,6 @@ public interface MigrationJobPublicAPI extends 
PipelineJobPublicAPI, RequiredSPI
      */
     Collection<DataConsistencyCheckAlgorithmInfo> 
listDataConsistencyCheckAlgorithms();
     
-    /**
-     * Is data consistency check needed.
-     *
-     * @param jobId job id
-     * @return data consistency check needed or not
-     */
-    boolean isDataConsistencyCheckNeeded(String jobId);
-    
     /**
      * Do data consistency check.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 72d8d84ef97..4ba03f71e41 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -224,4 +224,9 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
             throw new PipelineVerifyFailedException("Job is not stopped. You 
could run `STOP MIGRATION {jobId}` to stop it.");
         }
     }
+    
+    @Override
+    public String getType() {
+        return getJobType().getTypeName();
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
new file mode 100644
index 00000000000..5e50032e3eb
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+
+/**
+ * Data consistency calculate algorithm chooser.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class DataConsistencyCalculateAlgorithmChooser {
+    
+    /**
+     * Choose data consistency calculate algorithm when it's not defined.
+     *
+     * @param databaseType database type
+     * @param peerDatabaseType peer database type
+     * @return algorithm
+     */
+    public static DataConsistencyCalculateAlgorithm choose(final DatabaseType 
databaseType, final DatabaseType peerDatabaseType) {
+        String algorithmType;
+        if 
(!databaseType.getType().equalsIgnoreCase(peerDatabaseType.getType())) {
+            algorithmType = "DATA_MATCH";
+        } else if (databaseType instanceof MySQLDatabaseType) {
+            algorithmType = "CRC32_MATCH";
+        } else {
+            algorithmType = "DATA_MATCH";
+        }
+        return 
DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, null);
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DataConsistencyCheckUtils.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckUtils.java
similarity index 96%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DataConsistencyCheckUtils.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckUtils.java
index c38ef4d9d61..a4c703989c6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DataConsistencyCheckUtils.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckUtils.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.util;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 6e26385013a..849fa41fbab 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -28,7 +28,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.ColumnValueReaderFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.util.DataConsistencyCheckUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckUtils;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
similarity index 88%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index dfb7d2da219..9b61a39e315 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -15,9 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
-import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
@@ -30,7 +29,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -40,10 +38,6 @@ import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsist
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
-import org.apache.shardingsphere.mode.manager.ContextManager;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -66,12 +60,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Data consistency checker.
+ * Data consistency checker for migration job.
  */
 @Slf4j
-public final class DataConsistencyChecker {
+public final class MigrationDataConsistencyChecker {
     
-    // TODO remove jobConfig for common usage
     private final MigrationJobConfiguration jobConfig;
     
     private final String sourceTableName;
@@ -80,7 +73,7 @@ public final class DataConsistencyChecker {
     
     private final JobRateLimitAlgorithm readRateLimitAlgorithm;
     
-    public DataConsistencyChecker(final MigrationJobConfiguration jobConfig, 
final JobRateLimitAlgorithm readRateLimitAlgorithm) {
+    public MigrationDataConsistencyChecker(final MigrationJobConfiguration 
jobConfig, final JobRateLimitAlgorithm readRateLimitAlgorithm) {
         this.jobConfig = jobConfig;
         this.sourceTableName = jobConfig.getSourceTableName();
         tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(),
 Collections.singletonList(jobConfig.getSourceTableName())));
@@ -216,21 +209,6 @@ public final class DataConsistencyChecker {
         }
     }
     
-    private ShardingSphereTable getTableMetaData(final String databaseName, 
final String logicTableName) {
-        ContextManager contextManager = PipelineContext.getContextManager();
-        Preconditions.checkNotNull(contextManager, "ContextManager null");
-        ShardingSphereDatabase database = 
contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName);
-        if (null == database) {
-            throw new RuntimeException("Can not get meta data by database name 
" + databaseName);
-        }
-        String schemaName = 
tableNameSchemaNameMapping.getSchemaName(logicTableName);
-        ShardingSphereSchema schema = database.getSchema(schemaName);
-        if (null == schema) {
-            throw new RuntimeException("Can not get schema by schema name " + 
schemaName + ", logicTableName=" + logicTableName);
-        }
-        return schema.get(logicTableName);
-    }
-    
     private DataConsistencyCalculateParameter buildParameter(final 
PipelineDataSourceWrapper sourceDataSource, final TableNameSchemaNameMapping 
tableNameSchemaNameMapping,
                                                              final String 
tableName, final Collection<String> columnNames,
                                                              final String 
sourceDatabaseType, final String targetDatabaseType, final 
PipelineColumnMetaData uniqueKey) {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
index 95cea7fe14f..a7ac132b82d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
@@ -63,14 +63,6 @@ public interface MigrationJobAPI extends PipelineJobAPI, 
MigrationJobPublicAPI,
     @Override
     InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int 
shardingItem);
     
-    /**
-     * Is data consistency check needed.
-     *
-     * @param jobConfig job configuration
-     * @return data consistency check needed or not
-     */
-    boolean isDataConsistencyCheckNeeded(MigrationJobConfiguration jobConfig);
-    
     /**
      * Do data consistency check.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index ff38a56e71a..9102c356ebe 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -53,12 +53,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
+import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSourceResourceException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.DropMigrationSourceResourceException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineSchemaTableUtil;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -68,6 +67,7 @@ import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcess
 import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import 
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
@@ -267,13 +267,6 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         jobItemAPI.updateJobItemStatus(jobId, shardingItem, status);
     }
     
-    private void verifyManualMode(final MigrationJobConfiguration jobConfig) {
-        MigrationProcessContext processContext = 
buildPipelineProcessContext(jobConfig);
-        if (null != processContext.getCompletionDetectAlgorithm()) {
-            throw new PipelineVerifyFailedException("It's not necessary to do 
it in auto mode.");
-        }
-    }
-    
     @Override
     public Collection<DataConsistencyCheckAlgorithmInfo> 
listDataConsistencyCheckAlgorithms() {
         checkModeConfig();
@@ -286,40 +279,19 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         }).collect(Collectors.toList());
     }
     
-    @Override
-    public boolean isDataConsistencyCheckNeeded(final String jobId) {
-        log.info("isDataConsistencyCheckNeeded for job {}", jobId);
-        MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
-        return isDataConsistencyCheckNeeded(jobConfig);
-    }
-    
-    @Override
-    public boolean isDataConsistencyCheckNeeded(final 
MigrationJobConfiguration jobConfig) {
-        MigrationProcessContext processContext = 
buildPipelineProcessContext(jobConfig);
-        return isDataConsistencyCheckNeeded(processContext);
-    }
-    
-    private boolean isDataConsistencyCheckNeeded(final MigrationProcessContext 
processContext) {
-        return null != processContext.getDataConsistencyCalculateAlgorithm();
-    }
-    
     @Override
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
String jobId) {
         checkModeConfig();
         log.info("Data consistency check for job {}", jobId);
         MigrationJobConfiguration jobConfig = 
getJobConfiguration(getElasticJobConfigPOJO(jobId));
-        verifyDataConsistencyCheck(jobConfig);
         return dataConsistencyCheck(jobConfig);
     }
     
     @Override
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
MigrationJobConfiguration jobConfig) {
-        MigrationProcessContext processContext = 
buildPipelineProcessContext(jobConfig);
-        if (!isDataConsistencyCheckNeeded(processContext)) {
-            log.info("DataConsistencyCalculatorAlgorithm is not configured, 
data consistency check is ignored.");
-            return Collections.emptyMap();
-        }
-        return dataConsistencyCheck(jobConfig, 
processContext.getDataConsistencyCalculateAlgorithm());
+        DataConsistencyCalculateAlgorithm algorithm = 
DataConsistencyCalculateAlgorithmChooser.choose(
+                
DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()), 
DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()));
+        return dataConsistencyCheck(jobConfig, algorithm);
     }
     
     @Override
@@ -327,23 +299,18 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         checkModeConfig();
         log.info("Data consistency check for job {}, algorithmType: {}", 
jobId, algorithmType);
         MigrationJobConfiguration jobConfig = 
getJobConfiguration(getElasticJobConfigPOJO(jobId));
-        verifyDataConsistencyCheck(jobConfig);
         return dataConsistencyCheck(jobConfig, 
DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, 
algorithmProps));
     }
     
     private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
MigrationJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm 
calculator) {
         String jobId = jobConfig.getJobId();
         JobRateLimitAlgorithm readRateLimitAlgorithm = 
buildPipelineProcessContext(jobConfig).getReadRateLimitAlgorithm();
-        Map<String, DataConsistencyCheckResult> result = new 
DataConsistencyChecker(jobConfig, readRateLimitAlgorithm).check(calculator);
+        Map<String, DataConsistencyCheckResult> result = new 
MigrationDataConsistencyChecker(jobConfig, 
readRateLimitAlgorithm).check(calculator);
         log.info("Scaling job {} with check algorithm '{}' data consistency 
checker result {}", jobId, calculator.getType(), result);
         
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, 
aggregateDataConsistencyCheckResults(jobId, result));
         return result;
     }
     
-    private void verifyDataConsistencyCheck(final MigrationJobConfiguration 
jobConfig) {
-        verifyManualMode(jobConfig);
-    }
-    
     @Override
     public boolean aggregateDataConsistencyCheckResults(final String jobId, 
final Map<String, DataConsistencyCheckResult> checkResults) {
         if (checkResults.isEmpty()) {
@@ -489,9 +456,4 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         result.setParameter(parameter);
         return result;
     }
-    
-    @Override
-    public String getType() {
-        return getJobType().getTypeName();
-    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
index ab70abf2231..058fae8e271 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
@@ -19,11 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.context.AbstractPipelineProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 
 /**
@@ -33,13 +29,7 @@ import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcess
 @Slf4j
 public final class MigrationProcessContext extends 
AbstractPipelineProcessContext {
     
-    private final 
JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter> 
completionDetectAlgorithm;
-    
-    private final DataConsistencyCalculateAlgorithm 
dataConsistencyCalculateAlgorithm;
-    
     public MigrationProcessContext(final String jobId, final 
PipelineProcessConfiguration originalProcessConfig) {
         super(jobId, originalProcessConfig);
-        completionDetectAlgorithm = null;
-        dataConsistencyCalculateAlgorithm = 
DataConsistencyCalculateAlgorithmFactory.newInstance("DATA_MATCH", null);
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
deleted file mode 100644
index dadfcb92fd6..00000000000
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.dumper;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-
-/**
- * Dumper factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class DumperFactory {
-    
-}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooserTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooserTest.java
new file mode 100644
index 00000000000..dc3ec6ec8c0
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooserTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+
+import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+public final class DataConsistencyCalculateAlgorithmChooserTest {
+    
+    @Test
+    public void assertChooseOnDifferentDatabaseTypes() {
+        DatabaseType databaseType = DatabaseTypeFactory.getInstance("Oracle");
+        DatabaseType peerDatabaseType = 
DatabaseTypeFactory.getInstance("PostgreSQL");
+        DataConsistencyCalculateAlgorithm actual = 
DataConsistencyCalculateAlgorithmChooser.choose(databaseType, peerDatabaseType);
+        assertNotNull(actual);
+        assertThat(actual.getType(), is("DATA_MATCH"));
+    }
+    
+    @Test
+    public void assertChooseOnMySQL() {
+        DatabaseType databaseType = DatabaseTypeFactory.getInstance("MySQL");
+        DatabaseType peerDatabaseType = 
DatabaseTypeFactory.getInstance("MySQL");
+        DataConsistencyCalculateAlgorithm actual = 
DataConsistencyCalculateAlgorithmChooser.choose(databaseType, peerDatabaseType);
+        assertNotNull(actual);
+        assertThat(actual.getType(), is("CRC32_MATCH"));
+    }
+    
+    @Test
+    public void assertChooseOnPostgreSQL() {
+        DatabaseType databaseType = 
DatabaseTypeFactory.getInstance("PostgreSQL");
+        DatabaseType peerDatabaseType = 
DatabaseTypeFactory.getInstance("PostgreSQL");
+        DataConsistencyCalculateAlgorithm actual = 
DataConsistencyCalculateAlgorithmChooser.choose(databaseType, peerDatabaseType);
+        assertNotNull(actual);
+        assertThat(actual.getType(), is("DATA_MATCH"));
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DataConsistencyCheckUtilsTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckUtilsTest.java
similarity index 94%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DataConsistencyCheckUtilsTest.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckUtilsTest.java
index f7c25a82bf8..b04217ffc8c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DataConsistencyCheckUtilsTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckUtilsTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.util;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
 import org.junit.Test;
 
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index c63d89abccb..154b9a703e9 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -105,16 +105,6 @@ public final class MigrationJobAPIFixture implements 
MigrationJobAPI {
         return null;
     }
     
-    @Override
-    public boolean isDataConsistencyCheckNeeded(final String jobId) {
-        return false;
-    }
-    
-    @Override
-    public boolean isDataConsistencyCheckNeeded(final 
MigrationJobConfiguration jobConfig) {
-        return false;
-    }
-    
     @Override
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
String jobId) {
         return null;
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index feff2a6b343..16816224621 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -139,13 +139,6 @@ public final class MigrationJobAPIImplTest {
         assertThat(jobProgressMap.size(), is(1));
     }
     
-    @Test
-    public void assertIsDataConsistencyCheckNeeded() {
-        Optional<String> jobId = 
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
-        assertTrue(jobId.isPresent());
-        assertTrue(jobAPI.isDataConsistencyCheckNeeded(jobId.get()));
-    }
-    
     @Test
     public void assertDataConsistencyCheck() {
         Optional<String> jobId = 
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
similarity index 89%
rename from 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
index 2f0e02f975a..7cb88d883df 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
@@ -24,7 +24,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDa
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalculateAlgorithmFixture;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -37,7 +36,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
-public final class DataConsistencyCheckerTest {
+public final class MigrationDataConsistencyCheckerTest {
     
     @BeforeClass
     public static void beforeClass() {
@@ -46,7 +45,7 @@ public final class DataConsistencyCheckerTest {
     
     @Test
     public void assertCountAndDataCheck() throws SQLException {
-        Map<String, DataConsistencyCheckResult> actual = new 
DataConsistencyChecker(createJobConfiguration(), null).check(new 
DataConsistencyCalculateAlgorithmFixture());
+        Map<String, DataConsistencyCheckResult> actual = new 
MigrationDataConsistencyChecker(createJobConfiguration(), null).check(new 
DataConsistencyCalculateAlgorithmFixture());
         assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
         
assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(), 
is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
         assertTrue(actual.get("t_order").getContentCheckResult().isMatched());

Reply via email to