This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0b736453ea5 Refactor PipelineJobIdUtils (#29283)
0b736453ea5 is described below
commit 0b736453ea5649343de9457111d21caf7f7db311
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Dec 4 23:45:52 2023 +0800
Refactor PipelineJobIdUtils (#29283)
* Revise javadoc of PipelineJobId
* Refactor PipelineJobIdUtils
---
.../data/pipeline/common/job/PipelineJobId.java | 16 +++++++++++++---
.../data/pipeline/core/job/PipelineJobIdUtils.java | 17 +++++++++++++----
.../shardingsphere/data/pipeline/cdc/CDCJobId.java | 11 ++---------
.../shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java | 2 +-
.../shardingsphere/data/pipeline/cdc/CDCJobIdTest.java | 4 +++-
.../consistencycheck/ConsistencyCheckJobId.java | 8 ++------
.../consistencycheck/api/ConsistencyCheckJobAPI.java | 5 +++--
.../pipeline/scenario/migration/MigrationJobId.java | 12 ++----------
.../scenario/migration/api/MigrationJobAPI.java | 2 +-
.../data/pipeline/core/job/PipelineJobIdUtilsTest.java | 2 +-
.../pipeline/core/util/JobConfigurationBuilder.java | 3 ++-
.../consistencycheck/ConsistencyCheckJobTest.java | 3 ++-
.../api/impl/ConsistencyCheckJobAPITest.java | 2 +-
13 files changed, 46 insertions(+), 41 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
index 88d46ab720d..54e492672c2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
@@ -17,6 +17,9 @@
package org.apache.shardingsphere.data.pipeline.common.job;
+import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
+
/**
* Pipeline job id.
*/
@@ -25,9 +28,16 @@ public interface PipelineJobId {
String CURRENT_VERSION = "02";
/**
- * Marshal job ID.
+ * Get pipeline job type.
*
- * @return job ID
+ * @return pipeline job type
+ */
+ PipelineJobType getJobType();
+
+ /**
+ * Get pipeline context key.
+ *
+ * @return pipeline context key
*/
- String marshal();
+ PipelineContextKey getContextKey();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index add4d57766d..2ebadee92d7 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -24,6 +24,7 @@ import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
@@ -34,6 +35,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.util.json.JsonUtils;
import java.nio.charset.StandardCharsets;
@@ -46,11 +48,14 @@ public final class PipelineJobIdUtils {
/**
* Marshal job id prefix.
*
- * @param jobType pipeline job type
- * @param contextKey pipeline context key
- * @return job id common prefix
+ * @param jobId pipeline job id
+ * @return job id
*/
- public static String marshalPrefix(final PipelineJobType jobType, final
PipelineContextKey contextKey) {
+ public static String marshal(final PipelineJobId jobId) {
+ return marshalPrefix(jobId.getJobType(), jobId.getContextKey()) +
marshalSuffix(jobId);
+ }
+
+ private static String marshalPrefix(final PipelineJobType jobType, final
PipelineContextKey contextKey) {
InstanceType instanceType = contextKey.getInstanceType();
String databaseName = instanceType == InstanceType.PROXY ? "" :
contextKey.getDatabaseName();
String databaseNameHex =
Hex.encodeHexString(databaseName.getBytes(StandardCharsets.UTF_8), true);
@@ -59,6 +64,10 @@ public final class PipelineJobIdUtils {
return 'j' + jobType.getCode() + PipelineJobId.CURRENT_VERSION +
encodedInstanceType + databaseNameLengthHex + databaseNameHex;
}
+ private static String marshalSuffix(final PipelineJobId jobId) {
+ return
DigestUtils.md5Hex(JsonUtils.toJsonString(jobId).getBytes(StandardCharsets.UTF_8));
+ }
+
/**
* Parse job type.
*
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
index 31e406c53ca..86ecadbba18 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
@@ -17,21 +17,19 @@
package org.apache.shardingsphere.data.pipeline.cdc;
-import com.google.common.base.Joiner;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.commons.codec.digest.DigestUtils;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* CDC job id.
*/
@RequiredArgsConstructor
+@Getter
public final class CDCJobId implements PipelineJobId {
private final PipelineJobType jobType = new CDCJobType();
@@ -41,9 +39,4 @@ public final class CDCJobId implements PipelineJobId {
private final List<String> schemaTableNames;
private final boolean full;
-
- @Override
- public String marshal() {
- return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) +
DigestUtils.md5Hex(Joiner.on('|').join(contextKey.getDatabaseName(),
schemaTableNames, full).getBytes(StandardCharsets.UTF_8));
- }
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index d3667fd8111..ea30013cd48 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -134,7 +134,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final
StreamDataParameter param, final CDCSinkType sinkType, final Properties
sinkProps, final PipelineContextKey contextKey) {
YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
- result.setJobId(new CDCJobId(contextKey, param.getSchemaTableNames(),
param.isFull()).marshal());
+ result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey,
param.getSchemaTableNames(), param.isFull())));
result.setDatabaseName(param.getDatabaseName());
result.setSchemaTableNames(param.getSchemaTableNames());
result.setFull(param.isFull());
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
index b7657475f2f..f0d0c0b0bee 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
@@ -22,6 +22,8 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -29,7 +31,7 @@ class CDCJobIdTest {
@Test
void assertParseJobType() {
- String jobId = PipelineJobIdUtils.marshalPrefix(new CDCJobType(), new
PipelineContextKey("sharding_db", InstanceType.PROXY)) + "abcd";
+ String jobId = PipelineJobIdUtils.marshal(new CDCJobId(new
PipelineContextKey("sharding_db", InstanceType.PROXY),
Collections.singletonList("foo"), true));
assertThat(PipelineJobIdUtils.parseJobType(jobId),
instanceOf(CDCJobType.class));
}
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index 295145dc2f8..06dfe9ee8ab 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -17,15 +17,16 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+import lombok.Getter;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
/**
* Consistency check job id.
*/
+@Getter
public final class ConsistencyCheckJobId implements PipelineJobId {
private final PipelineJobType jobType = new ConsistencyCheckJobType();
@@ -59,9 +60,4 @@ public final class ConsistencyCheckJobId implements
PipelineJobId {
public static int parseSequence(final String checkJobId) {
return Integer.parseInt(checkJobId.substring(checkJobId.length() - 1));
}
-
- @Override
- public String marshal() {
- return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) +
parentJobId + sequence;
- }
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
index e4b7f31ebea..be1d5ee6f60 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
@@ -96,7 +96,8 @@ public final class ConsistencyCheckJobAPI {
}
checkPipelineDatabaseTypes(param);
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(parentJobId);
- String result = latestCheckJobId.map(optional -> new
ConsistencyCheckJobId(contextKey, parentJobId, optional)).orElseGet(() -> new
ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
+ String result = PipelineJobIdUtils.marshal(
+ latestCheckJobId.map(optional -> new
ConsistencyCheckJobId(contextKey, parentJobId, optional)).orElseGet(() -> new
ConsistencyCheckJobId(contextKey, parentJobId)));
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId,
result);
governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId,
result);
jobManager.drop(result);
@@ -155,7 +156,7 @@ public final class ConsistencyCheckJobAPI {
Optional<Integer> previousSequence =
ConsistencyCheckSequence.getPreviousSequence(
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()),
ConsistencyCheckJobId.parseSequence(latestCheckJobId));
if (previousSequence.isPresent()) {
- String checkJobId = new ConsistencyCheckJobId(contextKey,
parentJobId, previousSequence.get()).marshal();
+ String checkJobId = PipelineJobIdUtils.marshal(new
ConsistencyCheckJobId(contextKey, parentJobId, previousSequence.get()));
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId,
checkJobId);
} else {
governanceFacade.getJobFacade().getCheck().deleteLatestCheckJobId(parentJobId);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
index b21e1de4718..b0a13d30437 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
@@ -17,28 +17,20 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.commons.codec.digest.DigestUtils;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import org.apache.shardingsphere.infra.util.json.JsonUtils;
-
-import java.nio.charset.StandardCharsets;
/**
* Migration job id.
*/
@RequiredArgsConstructor
+@Getter
public final class MigrationJobId implements PipelineJobId {
private final PipelineJobType jobType = new MigrationJobType();
private final PipelineContextKey contextKey;
-
- @Override
- public String marshal() {
- return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) +
DigestUtils.md5Hex(JsonUtils.toJsonString(this).getBytes(StandardCharsets.UTF_8));
- }
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index f55ae220f42..75d1bf91c5e 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -162,7 +162,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
result.setTablesFirstDataNodes(new
JobDataNodeLine(tablesFirstDataNodes).marshal());
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
- result.setJobId(new MigrationJobId(contextKey).marshal());
+ result.setJobId(PipelineJobIdUtils.marshal(new
MigrationJobId(contextKey)));
return result;
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
index 6af65a7a0bc..f7c7bcf31bb 100644
---
a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
+++
b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
@@ -38,7 +38,7 @@ class PipelineJobIdUtilsTest {
private void assertParse0(final InstanceType instanceType) {
PipelineContextKey contextKey = new PipelineContextKey("sharding_db",
instanceType);
- String jobId = new MigrationJobId(contextKey).marshal();
+ String jobId = PipelineJobIdUtils.marshal(new
MigrationJobId(contextKey));
assertThat(PipelineJobIdUtils.parseJobType(jobId),
instanceOf(MigrationJobType.class));
PipelineContextKey actualContextKey =
PipelineJobIdUtils.parseContextKey(jobId);
assertThat(actualContextKey.getInstanceType(), is(instanceType));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index 1f27e007999..8a63105604c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
@@ -76,7 +77,7 @@ public final class JobConfigurationBuilder {
result.setTablesFirstDataNodes("t_order:ds_0.t_order");
result.setJobShardingDataNodes(Collections.singletonList("t_order:ds_0.t_order"));
PipelineContextKey contextKey = new
PipelineContextKey(RandomStringUtils.randomAlphabetic(32), InstanceType.PROXY);
- result.setJobId(new MigrationJobId(contextKey).marshal());
+ result.setJobId(PipelineJobIdUtils.marshal(new
MigrationJobId(contextKey)));
Map<String, YamlPipelineDataSourceConfiguration> sources = new
LinkedHashMap<>();
String databaseNameSuffix = RandomStringUtils.randomAlphabetic(9);
PipelineDataSourceConfiguration sourceDataSourceConfig = new
StandardPipelineDataSourceConfiguration(
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 1905c5dd263..9f86dcef8f4 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencychec
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
@@ -49,7 +50,7 @@ class ConsistencyCheckJobTest {
@Test
void assertBuildPipelineJobItemContext() {
ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new
PipelineContextKey(InstanceType.PROXY),
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
- String checkJobId = pipelineJobId.marshal();
+ String checkJobId = PipelineJobIdUtils.marshal(pipelineJobId);
Map<String, Object> expectTableCheckPosition =
Collections.singletonMap("t_order", 100);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index e3b53e10fcf..902bc94a93a 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -72,7 +72,7 @@ class ConsistencyCheckJobAPITest {
parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
ConsistencyCheckJobConfiguration checkJobConfig = new
PipelineJobConfigurationManager(jobType).getJobConfiguration(checkJobId);
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
- String expectCheckJobId = new
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId),
parentJobId, expectedSequence).marshal();
+ String expectCheckJobId = PipelineJobIdUtils.marshal(new
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId),
parentJobId, expectedSequence));
assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
assertNull(checkJobConfig.getAlgorithmTypeName());
int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);