sandynz commented on code in PR #21194:
URL: https://github.com/apache/shardingsphere/pull/21194#discussion_r979746251


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobResultConfiguration.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Consistency check job configuration for YAML.
+ */
+@Getter
+@Setter
+@Slf4j
+@ToString
+public final class YamlConsistencyCheckJobResultConfiguration implements 
YamlPipelineJobConfiguration {
+    
+    private String jobId;
+    
+    private String referredJobId;
+    
+    private String algorithmTypeName;
+    
+    @Override
+    public String getTargetDatabaseName() {
+        return null;
+    }

Review Comment:
   It's better to throw UnsupportedOperationException



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobResultConfiguration.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Consistency check job configuration for YAML.
+ */
+@Getter
+@Setter
+@Slf4j
+@ToString
+public final class YamlConsistencyCheckJobResultConfiguration implements 
YamlPipelineJobConfiguration {
+    
+    private String jobId;
+    
+    private String referredJobId;

Review Comment:
   Could we rename it to `parentJobId`?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.exception.job;
+
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import 
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
+
+/**
+ * Pipeline job has already existed exception.
+ */
+public final class PipelineJobHasAlreadyExistedException extends 
PipelineSQLException {
+    
+    private static final long serialVersionUID = 2854259384634892428L;
+    
+    public PipelineJobHasAlreadyExistedException(final String jobId) {
+        super(XOpenSQLState.GENERAL_ERROR, 81, "Job `%s` has already existed", 
jobId);
+    }

Review Comment:
   Error code `81` is already used by `PipelineJobHasAlreadyStartedException`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
+
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * YAML consistency check job configuration swapper.
+ */
+public final class YamlConsistencyCheckJobConfigurationSwapper implements 
YamlConfigurationSwapper<YamlConsistencyCheckJobResultConfiguration, 
ConsistencyCheckJobConfiguration> {

Review Comment:
   > <YamlConsistencyCheckJobResultConfiguration, 
ConsistencyCheckJobConfiguration>
   
   Could we rename `YamlConsistencyCheckJobResultConfiguration` to 
`YamlConsistencyCheckJobConfiguration`, it looks strange.



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResultConfiguration.YamlDataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResultSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Consistency check job.
+ */
+@Slf4j
+public final class ConsistencyCheckJob extends AbstractPipelineJob implements 
SimpleJob, PipelineJob {
+    
+    private static final YamlDataConsistencyCheckResultSwapper 
CHECK_RESULT_SWAPPER = new YamlDataConsistencyCheckResultSwapper();
+    
+    private final ConsistencyCheckJobAPI jobAPI = 
ConsistencyCheckJobAPIFactory.getInstance();
+    
+    private final PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance();
+    
+    @Override
+    public void execute(final ShardingContext shardingContext) {
+        String checkJobId = shardingContext.getJobName();
+        setJobId(checkJobId);
+        ConsistencyCheckJobConfiguration consistencyCheckJobConfig = 
YamlConsistencyCheckJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
+        ConsistencyCheckJobItemContext jobItemContext = new 
ConsistencyCheckJobItemContext(consistencyCheckJobConfig, 0, 
JobStatus.FINISHED);
+        jobAPI.persistJobItemProgress(jobItemContext);
+        String referredJobId = consistencyCheckJobConfig.getReferredJobId();
+        log.info("execute consistency check, job id:{}, referred job id:{}", 
checkJobId, referredJobId);
+        JobType jobType = PipelineJobIdUtils.parseJobType(referredJobId);
+        InventoryIncrementalJobPublicAPI jobPublicAPI = 
PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(jobType.getTypeName());
+        Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult;
+        if 
(StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())) {
+            dataConsistencyCheckResult = 
jobPublicAPI.dataConsistencyCheck(referredJobId);
+        } else {
+            dataConsistencyCheckResult = 
jobPublicAPI.dataConsistencyCheck(referredJobId, 
consistencyCheckJobConfig.getAlgorithmTypeName(), null);

Review Comment:
   The 3rd parameter `algorithmProps` is missed



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import 
org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
+
+/**
+ * Consistency check job API.
+ */
+public interface ConsistencyCheckJobAPI extends ConsistencyCheckJobPublicAPI, 
InventoryIncrementalJobAPI {

Review Comment:
   It should not extends `InventoryIncrementalJobAPI`, `PipelineJobAPI` could 
be used, maybe plus `PipelineJobItemAPI`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java:
##########
@@ -35,15 +32,31 @@ public final class ConsistencyCheckJobId extends 
AbstractPipelineJobId {
     
     public static final String CURRENT_VERSION = "01";
     
-    private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+    private static final int MAX_CONSISTENCY_CHECK_VERSION = 9;
     
     private final String pipelineJobId;
     
-    private final String createTimeMinutes;
+    private final Integer consistencyCheckVersion;

Review Comment:
   `Integer` could be `int`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Consistency check job configuration changed processor.
+ */
+@Slf4j
+public final class ConsistencyCheckChangedJobConfigurationProcessor implements 
PipelineChangedJobConfigurationProcessor {
+    
+    @Override
+    public void process(final DataChangedEvent.Type eventType, final 
JobConfigurationPOJO jobConfigPOJO) {
+        String jobId = jobConfigPOJO.getJobName();
+        if (jobConfigPOJO.isDisabled()) {
+            log.info("{} is disabled", jobId);
+            PipelineJobCenter.stop(jobId);
+            return;
+        }
+        switch (eventType) {
+            case ADDED:
+            case UPDATED:
+                if (PipelineJobCenter.isJobExisting(jobId)) {
+                    log.info("{} added to executing jobs failed since it 
already exists", jobId);
+                } else {
+                    log.info("{} executing jobs", jobId);
+                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), 
PipelineContext.getEventListenerExecutor());
+                }
+                break;
+            case DELETED:
+                log.info("deleted consistency check job id: {}", jobId);
+                PipelineJobCenter.stop(jobId);
+                break;
+            default:
+                break;
+        }
+    }
+    
+    private void execute(final JobConfigurationPOJO jobConfigPOJO) {
+        ConsistencyCheckJob job = new ConsistencyCheckJob();
+        PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+        OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, 
jobConfigPOJO.toJobConfiguration());
+        oneOffJobBootstrap.execute();
+        job.setOneOffJobBootstrap(oneOffJobBootstrap);

Review Comment:
   It's better to put `job.setOneOffJobBootstrap(oneOffJobBootstrap);` before 
`oneOffJobBootstrap.execute();`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Consistency check job item context.
+ */
+@Getter
+@Setter
+@Slf4j
+public final class ConsistencyCheckJobItemContext implements 
InventoryIncrementalJobItemContext {

Review Comment:
   It should not implements `InventoryIncrementalJobItemContext`, 
`PipelineJobItemContext` could be used



##########
shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java:
##########
@@ -159,10 +162,20 @@ protected String getJobIdByTableName(final String 
tableName) {
         return jobList.stream().filter(a -> 
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new 
RuntimeException("not find " + tableName + " table")).get("id").toString();
     }
     
-    protected void assertCheckMigrationSuccess(final String jobId, final 
String algorithmType) {
-        List<Map<String, Object>> checkJobResults = 
queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", 
jobId, algorithmType));
+    protected void assertCheckMigrationSuccess(final String jobId, final 
String algorithmType) throws SQLException {
+        proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE 
(NAME='%s')", jobId, algorithmType), 0);
+        List<Map<String, Object>> checkJobResults = Collections.emptyList();
+        for (int i = 0; i < 10; i++) {
+            checkJobResults = queryForListWithLog(String.format("SHOW 
MIGRATION CHECK STATUS '%s'", jobId));
+            if (null != checkJobResults && !checkJobResults.isEmpty()) {
+                break;
+            }
+            ThreadUtil.sleep(3, TimeUnit.SECONDS);
+        }

Review Comment:
   Is for loop and sleep still needed?



##########
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactoryTest.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public final class ConsistencyCheckJobAPIFactoryTest {
+    
+    @Test
+    public void assertGetInstance() {
+        assertThat(ConsistencyCheckJobAPIFactory.getInstance(), 
instanceOf(ConsistencyCheckJobAPIImpl.class));
+    }
+}

Review Comment:
   Other API factory need unit test too for ConsistencyCheckJobAPI



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java:
##########
@@ -35,15 +32,31 @@ public final class ConsistencyCheckJobId extends 
AbstractPipelineJobId {
     
     public static final String CURRENT_VERSION = "01";
     
-    private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+    private static final int MAX_CONSISTENCY_CHECK_VERSION = 9;
     
     private final String pipelineJobId;
     
-    private final String createTimeMinutes;
+    private final Integer consistencyCheckVersion;

Review Comment:
   And could we rename it to sequence etc?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateConsistencyCheckJobParameter.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.pojo;
+
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Create consistency check job parameter.
+ */
+@Data
+@RequiredArgsConstructor
+public final class CreateConsistencyCheckJobParameter {
+    
+    private final String jobId;
+    
+    private final String algorithmTypeName;
+}

Review Comment:
   Seems algorithm properties is missed



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResultConfiguration.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+
+import java.util.Map;
+
+/**
+ * Yaml data consistency check result config.
+ */
+@Getter
+@Setter
+@ToString
+public final class YamlDataConsistencyCheckResultConfiguration {
+    

Review Comment:
   Could we just use `YamlDataConsistencyCheckResult`?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.job;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Consistency check job configuration.
+ */
+@RequiredArgsConstructor
+@Getter
+@Slf4j
+@ToString
+public final class ConsistencyCheckJobConfiguration implements 
PipelineJobConfiguration {
+    
+    private final String jobId;
+    
+    private final String referredJobId;
+    
+    private final String algorithmTypeName;
+    
+    @Override
+    public String getSourceDatabaseType() {
+        return null;
+    }

Review Comment:
   It's better to throw UnsupportedOperationException



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResultSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobResultConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
+import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyExistedException;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Consistency check job API impl.
+ */
+@Slf4j
+public final class ConsistencyCheckJobAPIImpl extends 
AbstractInventoryIncrementalJobAPIImpl implements ConsistencyCheckJobAPI {
+    
+    @Override
+    public JobType getJobType() {
+        return JobType.CONSISTENCY_CHECK;
+    }

Review Comment:
   `getJobType()` could be put to the bottom of class



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api;
+
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
+
+import java.util.Map;
+
+/**
+ * Migration job public API.
+ */
+@SingletonSPI
+public interface ConsistencyCheckJobPublicAPI extends 
InventoryIncrementalJobPublicAPI, RequiredSPI {

Review Comment:
   It should not extends `InventoryIncrementalJobPublicAPI`, 
`PipelineJobPublicAPI` might be the better one



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Consistency check job configuration changed processor.
+ */
+@Slf4j
+public final class ConsistencyCheckChangedJobConfigurationProcessor implements 
PipelineChangedJobConfigurationProcessor {
+    
+    @Override
+    public void process(final DataChangedEvent.Type eventType, final 
JobConfigurationPOJO jobConfigPOJO) {
+        String jobId = jobConfigPOJO.getJobName();
+        if (jobConfigPOJO.isDisabled()) {
+            log.info("{} is disabled", jobId);
+            PipelineJobCenter.stop(jobId);
+            return;
+        }
+        switch (eventType) {
+            case ADDED:
+            case UPDATED:
+                if (PipelineJobCenter.isJobExisting(jobId)) {
+                    log.info("{} added to executing jobs failed since it 
already exists", jobId);
+                } else {
+                    log.info("{} executing jobs", jobId);
+                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), 
PipelineContext.getEventListenerExecutor());

Review Comment:
   `whenComplete` should be added for `CompletableFuture.runAsync`



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResultSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobResultConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
+import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyExistedException;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Consistency check job API impl.
+ */
+@Slf4j
+public final class ConsistencyCheckJobAPIImpl extends 
AbstractInventoryIncrementalJobAPIImpl implements ConsistencyCheckJobAPI {
+    

Review Comment:
   It should not extends `AbstractInventoryIncrementalJobAPIImpl`, 
`AbstractPipelineJobAPIImpl` could be used



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java:
##########
@@ -35,15 +32,31 @@ public final class ConsistencyCheckJobId extends 
AbstractPipelineJobId {
     
     public static final String CURRENT_VERSION = "01";
     
-    private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+    private static final int MAX_CONSISTENCY_CHECK_VERSION = 9;
     
     private final String pipelineJobId;
     
-    private final String createTimeMinutes;
+    private final Integer consistencyCheckVersion;
     
-    public ConsistencyCheckJobId(final @NonNull String pipelineJobId, final 
long createTimeMillis) {
+    public ConsistencyCheckJobId(final @NonNull String pipelineJobId, final 
int consistencyCheckVersion) {
         super(JobType.CONSISTENCY_CHECK, CURRENT_VERSION);
         this.pipelineJobId = pipelineJobId;
-        this.createTimeMinutes = 
DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(createTimeMillis));
+        if (consistencyCheckVersion > MAX_CONSISTENCY_CHECK_VERSION) {
+            this.consistencyCheckVersion = 0;
+        } else {
+            this.consistencyCheckVersion = consistencyCheckVersion;
+        }
+    }
+    
+    /**
+     * Get consistency check version.
+     *
+     * @param consistencyCheckJobId consistency check job id.
+     * @return consistency check version
+     */
+    public static int getConsistencyCheckVersion(final @NonNull String 
consistencyCheckJobId) {
+        String versionString = 
consistencyCheckJobId.substring(consistencyCheckJobId.length() - 1);
+        int version = Integer.parseInt(versionString);
+        return version > MAX_CONSISTENCY_CHECK_VERSION ? 0 : version;

Review Comment:
   It's better not change version any more after it's constructed



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java:
##########
@@ -72,12 +72,12 @@ public void assertGetJobConfigPath() {
     
     @Test
     public void assertGetCheckLatestResultPath() {
-        assertThat(PipelineMetaDataNode.getCheckLatestResultPath(jobId), 
is(jobCheckRootPath + "/latest_result"));
+        assertThat(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId), 
is(jobCheckRootPath + "/latest_job_id"));
     }
     
     @Test
-    public void assertGetCheckLatestDetailedResultPath() {
-        
assertThat(PipelineMetaDataNode.getCheckLatestDetailedResultPath(jobId), 
is(jobCheckRootPath + "/latest_detailed_result"));
+    public void assertgetCheckJobResultPath() {
+        assertThat(PipelineMetaDataNode.getCheckJobResultPath(jobId, 
"j02fx123"), is(jobCheckRootPath + "/job_ids/j02fx123"));
     }

Review Comment:
   These 2 methods name could be updated



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResultSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobResultConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
+import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyExistedException;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Consistency check job API impl.
+ */
+@Slf4j
+public final class ConsistencyCheckJobAPIImpl extends 
AbstractInventoryIncrementalJobAPIImpl implements ConsistencyCheckJobAPI {
+    
+    @Override
+    public JobType getJobType() {
+        return JobType.CONSISTENCY_CHECK;
+    }

Review Comment:
   And also `getJobClassName()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to