This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 9df5c119691 Add algorithm columns in "show migration check status" 
DistSQL result (#28402)
9df5c119691 is described below

commit 9df5c119691ef645cc4c56db6ea2a4184e0eace2
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Sep 10 17:32:21 2023 +0800

    Add algorithm columns in "show migration check status" DistSQL result 
(#28402)
    
    * Impl getSourceDatabaseType() for ConsistencyCheckJobConfiguration
    
    * Improve createIncrementalTaskProgress code style
    
    * Add algorithm columns in "show migration check status" DistSQL result
    
    * Improve migration general E2E to shorten waiting time in 
waitIncrementTaskFinished
---
 .../data/pipeline/common/pojo/ConsistencyCheckJobItemInfo.java |  4 ++++
 .../data/pipeline/core/task/PipelineTaskUtils.java             | 10 +++++-----
 .../handler/query/ShowMigrationCheckStatusExecutor.java        |  7 ++++---
 .../handler/query/ShowMigrationCheckStatusExecutorTest.java    |  4 +++-
 .../consistencycheck/api/impl/ConsistencyCheckJobAPI.java      |  5 +++++
 .../config/ConsistencyCheckJobConfiguration.java               |  5 +----
 .../config/yaml/YamlConsistencyCheckJobConfiguration.java      |  2 ++
 .../yaml/YamlConsistencyCheckJobConfigurationSwapper.java      |  6 +++++-
 .../context/ConsistencyCheckJobItemContextTest.java            | 10 ++++++++--
 .../cases/migration/general/MySQLMigrationGeneralE2EIT.java    |  5 +++--
 .../consistencycheck/api/impl/ConsistencyCheckJobAPITest.java  |  4 +++-
 11 files changed, 43 insertions(+), 19 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/ConsistencyCheckJobItemInfo.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/ConsistencyCheckJobItemInfo.java
index 7478a6a3622..0fa4f6cb77f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/ConsistencyCheckJobItemInfo.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/ConsistencyCheckJobItemInfo.java
@@ -45,5 +45,9 @@ public final class ConsistencyCheckJobItemInfo {
     
     private long durationSeconds;
     
+    private String algorithmType;
+    
+    private String algorithmProps;
+    
     private String errorMessage;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
index 49d29c2405b..c271c1f86d6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -51,14 +51,14 @@ public final class PipelineTaskUtils {
      * Create incremental task progress.
      *
      * @param position ingest position
-     * @param jobItemProgress job item progress
+     * @param initProgress initial job item progress
      * @return incremental task progress
      */
-    public static IncrementalTaskProgress createIncrementalTaskProgress(final 
IngestPosition position, final InventoryIncrementalJobItemProgress 
jobItemProgress) {
+    public static IncrementalTaskProgress createIncrementalTaskProgress(final 
IngestPosition position, final InventoryIncrementalJobItemProgress 
initProgress) {
         IncrementalTaskProgress result = new IncrementalTaskProgress(position);
-        if (null != jobItemProgress && null != 
jobItemProgress.getIncremental()) {
-            
Optional.ofNullable(jobItemProgress.getIncremental().getIncrementalTaskProgress())
-                    .ifPresent(optional -> 
result.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay()));
+        if (null != initProgress && null != initProgress.getIncremental()) {
+            
Optional.ofNullable(initProgress.getIncremental().getIncrementalTaskProgress())
+                    .ifPresent(optional -> 
result.setIncrementalTaskDelay(initProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay()));
         }
         return result;
     }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
index 6b27af0a606..98202a3c5b6 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
@@ -50,13 +50,14 @@ public final class ShowMigrationCheckStatusExecutor 
implements QueryableRALExecu
         String checkResult = null == info.getCheckSuccess() ? "" : 
info.getCheckSuccess().toString();
         return new 
LocalDataQueryResultRow(Optional.ofNullable(info.getTableNames()).orElse(""), 
checkResult, Optional.ofNullable(info.getCheckFailedTableNames()).orElse(""),
                 String.valueOf(info.getFinishedPercentage()), 
info.getRemainingSeconds(),
-                Optional.ofNullable(info.getCheckBeginTime()).orElse(""), 
Optional.ofNullable(info.getCheckEndTime()).orElse(""),
-                info.getDurationSeconds(), 
Optional.ofNullable(info.getErrorMessage()).orElse(""));
+                Optional.ofNullable(info.getCheckBeginTime()).orElse(""), 
Optional.ofNullable(info.getCheckEndTime()).orElse(""), 
info.getDurationSeconds(),
+                info.getAlgorithmType(), 
Optional.ofNullable(info.getAlgorithmProps()).orElse(""), 
Optional.ofNullable(info.getErrorMessage()).orElse(""));
     }
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("tables", "result", "check_failed_tables", 
"finished_percentage", "remaining_seconds", "check_begin_time", 
"check_end_time", "duration_seconds", "error_message");
+        return Arrays.asList("tables", "result", "check_failed_tables", 
"finished_percentage", "remaining_seconds", "check_begin_time", 
"check_end_time", "duration_seconds",
+                "algorithm_type", "algorithm_props", "error_message");
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
 
b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
index 64deb6be459..fa45d54fb3a 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
@@ -32,7 +32,7 @@ class ShowMigrationCheckStatusExecutorTest {
     @Test
     void assertGetColumnNames() {
         Collection<String> columns = executor.getColumnNames();
-        assertThat(columns.size(), is(9));
+        assertThat(columns.size(), is(11));
         Iterator<String> iterator = columns.iterator();
         assertThat(iterator.next(), is("tables"));
         assertThat(iterator.next(), is("result"));
@@ -42,6 +42,8 @@ class ShowMigrationCheckStatusExecutorTest {
         assertThat(iterator.next(), is("check_begin_time"));
         assertThat(iterator.next(), is("check_end_time"));
         assertThat(iterator.next(), is("duration_seconds"));
+        assertThat(iterator.next(), is("algorithm_type"));
+        assertThat(iterator.next(), is("algorithm_props"));
         assertThat(iterator.next(), is("error_message"));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index d87119b58c3..37acaab9719 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -333,6 +333,11 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
         String tableNames = jobItemProgress.getTableNames();
         result.setTableNames(Optional.ofNullable(tableNames).orElse(""));
         result.setCheckBeginTime(DATE_TIME_FORMATTER.format(checkBeginTime));
+        ConsistencyCheckJobConfiguration jobConfig = 
getJobConfiguration(checkJobId);
+        result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
+        if (null != jobConfig.getAlgorithmProps()) {
+            
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry
 -> String.format("'%s'='%s'", entry.getKey(), 
entry.getValue())).collect(Collectors.joining(",")));
+        }
         result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0));
         Map<String, TableDataConsistencyCheckResult> checkJobResult = 
governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
         if (checkJobResult.isEmpty()) {
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
index c0aeda44758..a969417d96f 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
@@ -41,10 +41,7 @@ public final class ConsistencyCheckJobConfiguration 
implements PipelineJobConfig
     
     private final Properties algorithmProps;
     
-    @Override
-    public DatabaseType getSourceDatabaseType() {
-        throw new UnsupportedOperationException("");
-    }
+    private final DatabaseType sourceDatabaseType;
     
     /**
      * Get job sharding count.
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfiguration.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfiguration.java
index 06e840d2438..3bf659400d3 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfiguration.java
@@ -38,6 +38,8 @@ public final class YamlConsistencyCheckJobConfiguration 
implements YamlPipelineJ
     
     private Properties algorithmProps;
     
+    private String sourceDatabaseType;
+    
     @Override
     public String getDatabaseName() {
         throw new UnsupportedOperationException("");
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
index 66736fc384f..5431c935f21 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
@@ -18,6 +18,8 @@
 package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml;
 
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
@@ -33,12 +35,14 @@ public final class 
YamlConsistencyCheckJobConfigurationSwapper implements YamlCo
         result.setParentJobId(data.getParentJobId());
         result.setAlgorithmTypeName(data.getAlgorithmTypeName());
         result.setAlgorithmProps(data.getAlgorithmProps());
+        result.setSourceDatabaseType(null == data.getSourceDatabaseType() ? 
null : data.getSourceDatabaseType().getType());
         return result;
     }
     
     @Override
     public ConsistencyCheckJobConfiguration swapToObject(final 
YamlConsistencyCheckJobConfiguration yamlConfig) {
-        return new ConsistencyCheckJobConfiguration(yamlConfig.getJobId(), 
yamlConfig.getParentJobId(), yamlConfig.getAlgorithmTypeName(), 
yamlConfig.getAlgorithmProps());
+        DatabaseType databaseType = null == yamlConfig.getSourceDatabaseType() 
? null : TypedSPILoader.getService(DatabaseType.class, 
yamlConfig.getSourceDatabaseType());
+        return new ConsistencyCheckJobConfiguration(yamlConfig.getJobId(), 
yamlConfig.getParentJobId(), yamlConfig.getAlgorithmTypeName(), 
yamlConfig.getAlgorithmProps(), databaseType);
     }
     
     /**
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
index 32f4ac70e18..753f5e9105f 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
@@ -22,6 +22,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
@@ -35,12 +37,15 @@ class ConsistencyCheckJobItemContextTest {
     
     private static final String TABLE = "t_order";
     
+    private final DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "H2");
+    
     @Test
     void assertConstructWithoutTableCheckPositions() {
         Map<String, Object> sourceTableCheckPositions = Collections.emptyMap();
         Map<String, Object> targetTableCheckPositions = Collections.emptyMap();
         ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, 
sourceTableCheckPositions, targetTableCheckPositions);
-        ConsistencyCheckJobItemContext actual = new 
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "", 
"DATA_MATCH", null), 0, JobStatus.RUNNING, jobItemProgress);
+        ConsistencyCheckJobItemContext actual = new 
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "", 
"DATA_MATCH", null, databaseType),
+                0, JobStatus.RUNNING, jobItemProgress);
         verifyProgressContext(actual.getProgressContext(), 0, 
sourceTableCheckPositions, targetTableCheckPositions);
     }
     
@@ -49,7 +54,8 @@ class ConsistencyCheckJobItemContextTest {
         Map<String, Object> sourceTableCheckPositions = ImmutableMap.of(TABLE, 
6);
         Map<String, Object> targetTableCheckPositions = ImmutableMap.of(TABLE, 
5);
         ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, 
sourceTableCheckPositions, targetTableCheckPositions);
-        ConsistencyCheckJobItemContext actual = new 
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "", 
"DATA_MATCH", null), 0, JobStatus.RUNNING, jobItemProgress);
+        ConsistencyCheckJobItemContext actual = new 
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "", 
"DATA_MATCH", null, databaseType),
+                0, JobStatus.RUNNING, jobItemProgress);
         verifyProgressContext(actual.getProgressContext(), 1, 
sourceTableCheckPositions, targetTableCheckPositions);
         
assertThat(actual.getProgressContext().getSourceTableCheckPositions().get(TABLE),
 is(6));
         
assertThat(actual.getProgressContext().getTargetTableCheckPositions().get(TABLE),
 is(5));
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 740f7c52395..e48d3d4c572 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -84,10 +84,12 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
                     new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, 
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
             
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
+            containerComposer.sourceExecuteWithLog("INSERT INTO t_order_item 
(item_id, order_id, user_id, status) VALUES (10000, 10000, 1, 'OK')");
             stopMigrationByJobId(containerComposer, orderJobId);
             startMigrationByJobId(containerComposer, orderJobId);
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", 10000);
+            containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order_item", 10000);
             Properties algorithmProps = new Properties();
             algorithmProps.setProperty("chunk-size", "300");
             assertMigrationSuccessById(containerComposer, orderJobId, 
"DATA_MATCH", algorithmProps);
@@ -98,8 +100,7 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             for (String each : listJobId(containerComposer)) {
                 commitMigrationByJobId(containerComposer, each);
             }
-            List<String> lastJobIds = listJobId(containerComposer);
-            assertTrue(lastJobIds.isEmpty());
+            assertTrue(listJobId(containerComposer).isEmpty());
             
containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
         }
     }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index c6f78a04282..962f3340821 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -32,6 +32,8 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
 import org.junit.jupiter.api.BeforeAll;
@@ -99,7 +101,7 @@ class ConsistencyCheckJobAPITest {
             String checkJobId = checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
                     parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
             ConsistencyCheckJobItemContext checkJobItemContext = new 
ConsistencyCheckJobItemContext(
-                    new ConsistencyCheckJobConfiguration(checkJobId, 
parentJobId, null, null), 0, JobStatus.FINISHED, null);
+                    new ConsistencyCheckJobConfiguration(checkJobId, 
parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")), 
0, JobStatus.FINISHED, null);
             checkJobAPI.persistJobItemProgress(checkJobItemContext);
             Map<String, TableDataConsistencyCheckResult> 
dataConsistencyCheckResult = Collections.singletonMap("t_order",
                     new TableDataConsistencyCheckResult(new 
TableDataConsistencyCountCheckResult(0, 0), new 
TableDataConsistencyContentCheckResult(true)));

Reply via email to