This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b736453ea5 Refactor PipelineJobIdUtils (#29283)
0b736453ea5 is described below

commit 0b736453ea5649343de9457111d21caf7f7db311
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Dec 4 23:45:52 2023 +0800

    Refactor PipelineJobIdUtils (#29283)
    
    * Revise javadoc of PipelineJobId
    
    * Refactor PipelineJobIdUtils
---
 .../data/pipeline/common/job/PipelineJobId.java         | 16 +++++++++++++---
 .../data/pipeline/core/job/PipelineJobIdUtils.java      | 17 +++++++++++++----
 .../shardingsphere/data/pipeline/cdc/CDCJobId.java      | 11 ++---------
 .../shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java |  2 +-
 .../shardingsphere/data/pipeline/cdc/CDCJobIdTest.java  |  4 +++-
 .../consistencycheck/ConsistencyCheckJobId.java         |  8 ++------
 .../consistencycheck/api/ConsistencyCheckJobAPI.java    |  5 +++--
 .../pipeline/scenario/migration/MigrationJobId.java     | 12 ++----------
 .../scenario/migration/api/MigrationJobAPI.java         |  2 +-
 .../data/pipeline/core/job/PipelineJobIdUtilsTest.java  |  2 +-
 .../pipeline/core/util/JobConfigurationBuilder.java     |  3 ++-
 .../consistencycheck/ConsistencyCheckJobTest.java       |  3 ++-
 .../api/impl/ConsistencyCheckJobAPITest.java            |  2 +-
 13 files changed, 46 insertions(+), 41 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
index 88d46ab720d..54e492672c2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
@@ -17,6 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.common.job;
 
+import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
+
 /**
  * Pipeline job id.
  */
@@ -25,9 +28,16 @@ public interface PipelineJobId {
     String CURRENT_VERSION = "02";
     
     /**
-     * Marshal job ID.
+     * Get pipeline job type.
      * 
-     * @return job ID
+     * @return pipeline job type
+     */
+    PipelineJobType getJobType();
+    
+    /**
+     * Get pipeline context key.
+     *
+     * @return pipeline context key
      */
-    String marshal();
+    PipelineContextKey getContextKey();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index add4d57766d..2ebadee92d7 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -24,6 +24,7 @@ import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
@@ -34,6 +35,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.util.json.JsonUtils;
 
 import java.nio.charset.StandardCharsets;
 
@@ -46,11 +48,14 @@ public final class PipelineJobIdUtils {
     /**
      * Marshal job id prefix.
      *
-     * @param jobType pipeline job type
-     * @param contextKey pipeline context key
-     * @return job id common prefix
+     * @param jobId pipeline job id
+     * @return job id
      */
-    public static String marshalPrefix(final PipelineJobType jobType, final 
PipelineContextKey contextKey) {
+    public static String marshal(final PipelineJobId jobId) {
+        return marshalPrefix(jobId.getJobType(), jobId.getContextKey()) + 
marshalSuffix(jobId);
+    }
+    
+    private static String marshalPrefix(final PipelineJobType jobType, final 
PipelineContextKey contextKey) {
         InstanceType instanceType = contextKey.getInstanceType();
         String databaseName = instanceType == InstanceType.PROXY ? "" : 
contextKey.getDatabaseName();
         String databaseNameHex = 
Hex.encodeHexString(databaseName.getBytes(StandardCharsets.UTF_8), true);
@@ -59,6 +64,10 @@ public final class PipelineJobIdUtils {
         return 'j' + jobType.getCode() + PipelineJobId.CURRENT_VERSION + 
encodedInstanceType + databaseNameLengthHex + databaseNameHex;
     }
     
+    private static String marshalSuffix(final PipelineJobId jobId) {
+        return 
DigestUtils.md5Hex(JsonUtils.toJsonString(jobId).getBytes(StandardCharsets.UTF_8));
+    }
+    
     /**
      * Parse job type.
      *
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
index 31e406c53ca..86ecadbba18 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
@@ -17,21 +17,19 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc;
 
-import com.google.common.base.Joiner;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.commons.codec.digest.DigestUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 
 /**
  * CDC job id.
  */
 @RequiredArgsConstructor
+@Getter
 public final class CDCJobId implements PipelineJobId {
     
     private final PipelineJobType jobType = new CDCJobType();
@@ -41,9 +39,4 @@ public final class CDCJobId implements PipelineJobId {
     private final List<String> schemaTableNames;
     
     private final boolean full;
-    
-    @Override
-    public String marshal() {
-        return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) + 
DigestUtils.md5Hex(Joiner.on('|').join(contextKey.getDatabaseName(), 
schemaTableNames, full).getBytes(StandardCharsets.UTF_8));
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index d3667fd8111..ea30013cd48 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -134,7 +134,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
     
     private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final 
StreamDataParameter param, final CDCSinkType sinkType, final Properties 
sinkProps, final PipelineContextKey contextKey) {
         YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
-        result.setJobId(new CDCJobId(contextKey, param.getSchemaTableNames(), 
param.isFull()).marshal());
+        result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey, 
param.getSchemaTableNames(), param.isFull())));
         result.setDatabaseName(param.getDatabaseName());
         result.setSchemaTableNames(param.getSchemaTableNames());
         result.setFull(param.isFull());
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
index b7657475f2f..f0d0c0b0bee 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
@@ -22,6 +22,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -29,7 +31,7 @@ class CDCJobIdTest {
     
     @Test
     void assertParseJobType() {
-        String jobId = PipelineJobIdUtils.marshalPrefix(new CDCJobType(), new 
PipelineContextKey("sharding_db", InstanceType.PROXY)) + "abcd";
+        String jobId = PipelineJobIdUtils.marshal(new CDCJobId(new 
PipelineContextKey("sharding_db", InstanceType.PROXY), 
Collections.singletonList("foo"), true));
         assertThat(PipelineJobIdUtils.parseJobType(jobId), 
instanceOf(CDCJobType.class));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index 295145dc2f8..06dfe9ee8ab 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -17,15 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
+import lombok.Getter;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
 
 /**
  * Consistency check job id.
  */
+@Getter
 public final class ConsistencyCheckJobId implements PipelineJobId {
     
     private final PipelineJobType jobType = new ConsistencyCheckJobType();
@@ -59,9 +60,4 @@ public final class ConsistencyCheckJobId implements 
PipelineJobId {
     public static int parseSequence(final String checkJobId) {
         return Integer.parseInt(checkJobId.substring(checkJobId.length() - 1));
     }
-    
-    @Override
-    public String marshal() {
-        return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) + 
parentJobId + sequence;
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
index e4b7f31ebea..be1d5ee6f60 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
@@ -96,7 +96,8 @@ public final class ConsistencyCheckJobAPI {
         }
         checkPipelineDatabaseTypes(param);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(parentJobId);
-        String result = latestCheckJobId.map(optional -> new 
ConsistencyCheckJobId(contextKey, parentJobId, optional)).orElseGet(() -> new 
ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
+        String result = PipelineJobIdUtils.marshal(
+                latestCheckJobId.map(optional -> new 
ConsistencyCheckJobId(contextKey, parentJobId, optional)).orElseGet(() -> new 
ConsistencyCheckJobId(contextKey, parentJobId)));
         
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, 
result);
         
governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId, 
result);
         jobManager.drop(result);
@@ -155,7 +156,7 @@ public final class ConsistencyCheckJobAPI {
         Optional<Integer> previousSequence = 
ConsistencyCheckSequence.getPreviousSequence(
                 
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()),
 ConsistencyCheckJobId.parseSequence(latestCheckJobId));
         if (previousSequence.isPresent()) {
-            String checkJobId = new ConsistencyCheckJobId(contextKey, 
parentJobId, previousSequence.get()).marshal();
+            String checkJobId = PipelineJobIdUtils.marshal(new 
ConsistencyCheckJobId(contextKey, parentJobId, previousSequence.get()));
             
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, 
checkJobId);
         } else {
             
governanceFacade.getJobFacade().getCheck().deleteLatestCheckJobId(parentJobId);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
index b21e1de4718..b0a13d30437 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
@@ -17,28 +17,20 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.commons.codec.digest.DigestUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import org.apache.shardingsphere.infra.util.json.JsonUtils;
-
-import java.nio.charset.StandardCharsets;
 
 /**
  * Migration job id.
  */
 @RequiredArgsConstructor
+@Getter
 public final class MigrationJobId implements PipelineJobId {
     
     private final PipelineJobType jobType = new MigrationJobType();
     
     private final PipelineContextKey contextKey;
-    
-    @Override
-    public String marshal() {
-        return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) + 
DigestUtils.md5Hex(JsonUtils.toJsonString(this).getBytes(StandardCharsets.UTF_8));
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index f55ae220f42..75d1bf91c5e 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -162,7 +162,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
         
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
         result.setTablesFirstDataNodes(new 
JobDataNodeLine(tablesFirstDataNodes).marshal());
         
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
-        result.setJobId(new MigrationJobId(contextKey).marshal());
+        result.setJobId(PipelineJobIdUtils.marshal(new 
MigrationJobId(contextKey)));
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
 
b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
index 6af65a7a0bc..f7c7bcf31bb 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
@@ -38,7 +38,7 @@ class PipelineJobIdUtilsTest {
     
     private void assertParse0(final InstanceType instanceType) {
         PipelineContextKey contextKey = new PipelineContextKey("sharding_db", 
instanceType);
-        String jobId = new MigrationJobId(contextKey).marshal();
+        String jobId = PipelineJobIdUtils.marshal(new 
MigrationJobId(contextKey));
         assertThat(PipelineJobIdUtils.parseJobType(jobId), 
instanceOf(MigrationJobType.class));
         PipelineContextKey actualContextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
         assertThat(actualContextKey.getInstanceType(), is(instanceType));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index 1f27e007999..8a63105604c 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
@@ -76,7 +77,7 @@ public final class JobConfigurationBuilder {
         result.setTablesFirstDataNodes("t_order:ds_0.t_order");
         
result.setJobShardingDataNodes(Collections.singletonList("t_order:ds_0.t_order"));
         PipelineContextKey contextKey = new 
PipelineContextKey(RandomStringUtils.randomAlphabetic(32), InstanceType.PROXY);
-        result.setJobId(new MigrationJobId(contextKey).marshal());
+        result.setJobId(PipelineJobIdUtils.marshal(new 
MigrationJobId(contextKey)));
         Map<String, YamlPipelineDataSourceConfiguration> sources = new 
LinkedHashMap<>();
         String databaseNameSuffix = RandomStringUtils.randomAlphabetic(9);
         PipelineDataSourceConfiguration sourceDataSourceConfig = new 
StandardPipelineDataSourceConfiguration(
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 1905c5dd263..9f86dcef8f4 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencychec
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
@@ -49,7 +50,7 @@ class ConsistencyCheckJobTest {
     @Test
     void assertBuildPipelineJobItemContext() {
         ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new 
PipelineContextKey(InstanceType.PROXY), 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
-        String checkJobId = pipelineJobId.marshal();
+        String checkJobId = PipelineJobIdUtils.marshal(pipelineJobId);
         Map<String, Object> expectTableCheckPosition = 
Collections.singletonMap("t_order", 100);
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
 0,
                 
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index e3b53e10fcf..902bc94a93a 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -72,7 +72,7 @@ class ConsistencyCheckJobAPITest {
                 parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
         ConsistencyCheckJobConfiguration checkJobConfig = new 
PipelineJobConfigurationManager(jobType).getJobConfiguration(checkJobId);
         int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
-        String expectCheckJobId = new 
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), 
parentJobId, expectedSequence).marshal();
+        String expectCheckJobId = PipelineJobIdUtils.marshal(new 
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), 
parentJobId, expectedSequence));
         assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
         assertNull(checkJobConfig.getAlgorithmTypeName());
         int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);

Reply via email to