This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new df6cb453c7a Refactor PipelineJobIdUtils.marshalPrefix() (#29280)
df6cb453c7a is described below
commit df6cb453c7a938a2b41b3375eb91af45c5c8bcfa
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Dec 4 22:30:35 2023 +0800
Refactor PipelineJobIdUtils.marshalPrefix() (#29280)
* Refactor PipelineJobIdUtils.marshalPrefix()
* Add debug info
* Remove debug info
---
.../data/pipeline/core/job/PipelineJobIdUtils.java | 11 ++++++-----
.../org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java | 2 +-
.../apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java | 11 ++---------
.../scenario/consistencycheck/ConsistencyCheckJobId.java | 2 +-
.../data/pipeline/scenario/migration/MigrationJobId.java | 2 +-
5 files changed, 11 insertions(+), 17 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index 3b0c731d87e..add4d57766d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -46,16 +46,17 @@ public final class PipelineJobIdUtils {
/**
* Marshal job id prefix.
*
- * @param jobId pipeline job id
+ * @param jobType pipeline job type
+ * @param contextKey pipeline context key
* @return job id common prefix
*/
- public static String marshalPrefix(final PipelineJobId jobId) {
- InstanceType instanceType = jobId.getContextKey().getInstanceType();
- String databaseName = instanceType == InstanceType.PROXY ? "" :
jobId.getContextKey().getDatabaseName();
+ public static String marshalPrefix(final PipelineJobType jobType, final
PipelineContextKey contextKey) {
+ InstanceType instanceType = contextKey.getInstanceType();
+ String databaseName = instanceType == InstanceType.PROXY ? "" :
contextKey.getDatabaseName();
String databaseNameHex =
Hex.encodeHexString(databaseName.getBytes(StandardCharsets.UTF_8), true);
String databaseNameLengthHex =
Hex.encodeHexString(Shorts.toByteArray((short) databaseNameHex.length()), true);
char encodedInstanceType = InstanceTypeUtils.encode(instanceType);
- return 'j' + jobId.getJobType().getCode() + jobId.CURRENT_VERSION +
encodedInstanceType + databaseNameLengthHex + databaseNameHex;
+ return 'j' + jobType.getCode() + PipelineJobId.CURRENT_VERSION +
encodedInstanceType + databaseNameLengthHex + databaseNameHex;
}
/**
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
index 868a2ccce40..aea9140d643 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
@@ -51,6 +51,6 @@ public final class CDCJobId implements PipelineJobId {
@Override
public String marshal() {
- return PipelineJobIdUtils.marshalPrefix(this) +
DigestUtils.md5Hex(Joiner.on('|').join(getContextKey().getDatabaseName(),
schemaTableNames, full).getBytes(StandardCharsets.UTF_8));
+ return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) +
DigestUtils.md5Hex(Joiner.on('|').join(contextKey.getDatabaseName(),
schemaTableNames, full).getBytes(StandardCharsets.UTF_8));
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
index 741a26a35e7..b7657475f2f 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
@@ -17,15 +17,11 @@
package org.apache.shardingsphere.data.pipeline.cdc;
-import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
-
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -33,10 +29,7 @@ class CDCJobIdTest {
@Test
void assertParseJobType() {
- PipelineContextKey contextKey = new PipelineContextKey("sharding_db",
InstanceType.PROXY);
- CDCJobId pipelineJobId = new CDCJobId(contextKey,
Arrays.asList("test", "t_order"), false, CDCSinkType.SOCKET.name());
- String jobId = PipelineJobIdUtils.marshalPrefix(pipelineJobId) +
"abcd";
- PipelineJobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
- assertThat(actualJobType, instanceOf(CDCJobType.class));
+ String jobId = PipelineJobIdUtils.marshalPrefix(new CDCJobType(), new
PipelineContextKey("sharding_db", InstanceType.PROXY)) + "abcd";
+ assertThat(PipelineJobIdUtils.parseJobType(jobId),
instanceOf(CDCJobType.class));
}
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index 3f6e7da8e4e..48b49d28624 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -67,6 +67,6 @@ public final class ConsistencyCheckJobId implements
PipelineJobId {
@Override
public String marshal() {
- return PipelineJobIdUtils.marshalPrefix(this) + parentJobId + sequence;
+ return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) +
parentJobId + sequence;
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
index 97777493790..37d0cac87fb 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
@@ -47,6 +47,6 @@ public final class MigrationJobId implements PipelineJobId {
@Override
public String marshal() {
- return PipelineJobIdUtils.marshalPrefix(this) +
DigestUtils.md5Hex(JsonUtils.toJsonString(this).getBytes(StandardCharsets.UTF_8));
+ return PipelineJobIdUtils.marshalPrefix(jobType, contextKey) +
DigestUtils.md5Hex(JsonUtils.toJsonString(this).getBytes(StandardCharsets.UTF_8));
}
}