This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 15f229279ee Remove AbstractPipelineJobId (#29278)
15f229279ee is described below
commit 15f229279ee144cea79093dba41db8dae0e29d8a
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Dec 4 20:13:45 2023 +0800
Remove AbstractPipelineJobId (#29278)
---
.../data/pipeline/common/job/PipelineJobId.java | 9 +----
.../pipeline/core/job/AbstractPipelineJobId.java | 44 ----------------------
.../data/pipeline/core/job/PipelineJobIdUtils.java | 14 +++----
.../shardingsphere/data/pipeline/cdc/CDCJobId.java | 23 +++++------
.../data/pipeline/cdc/CDCJobIdTest.java | 2 +-
.../consistencycheck/ConsistencyCheckJobId.java | 16 +++++---
.../scenario/migration/MigrationJobId.java | 21 ++++++-----
7 files changed, 45 insertions(+), 84 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
index 9f8039a30c1..9eeec1d9383 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
@@ -25,6 +25,8 @@ import
org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
*/
public interface PipelineJobId {
+ String CURRENT_VERSION = "02";
+
/**
* Get job type.
*
@@ -32,13 +34,6 @@ public interface PipelineJobId {
*/
PipelineJobType getJobType();
- /**
- * Get format version.
- *
- * @return format version
- */
- String getFormatVersion();
-
/**
* Get pipeline context key.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
deleted file mode 100644
index 17ffc1d82db..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job;
-
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-
-/**
- * Abstract pipeline job id.
- */
-@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
-@Getter
-public abstract class AbstractPipelineJobId implements PipelineJobId {
-
- public static final String CURRENT_VERSION = "02";
-
- private final PipelineJobType jobType;
-
- private final PipelineContextKey contextKey;
-
- @Override
- public String getFormatVersion() {
- return CURRENT_VERSION;
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index afa352deec3..3b0c731d87e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -44,18 +44,18 @@ import java.nio.charset.StandardCharsets;
public final class PipelineJobIdUtils {
/**
- * Marshal job id common prefix.
+ * Marshal job id prefix.
*
- * @param pipelineJobId pipeline job id
+ * @param jobId pipeline job id
* @return job id common prefix
*/
- public static String marshalJobIdCommonPrefix(final PipelineJobId
pipelineJobId) {
- InstanceType instanceType =
pipelineJobId.getContextKey().getInstanceType();
- String databaseName = instanceType == InstanceType.PROXY ? "" :
pipelineJobId.getContextKey().getDatabaseName();
+ public static String marshalPrefix(final PipelineJobId jobId) {
+ InstanceType instanceType = jobId.getContextKey().getInstanceType();
+ String databaseName = instanceType == InstanceType.PROXY ? "" :
jobId.getContextKey().getDatabaseName();
String databaseNameHex =
Hex.encodeHexString(databaseName.getBytes(StandardCharsets.UTF_8), true);
String databaseNameLengthHex =
Hex.encodeHexString(Shorts.toByteArray((short) databaseNameHex.length()), true);
char encodedInstanceType = InstanceTypeUtils.encode(instanceType);
- return 'j' + pipelineJobId.getJobType().getCode() +
pipelineJobId.getFormatVersion() + encodedInstanceType + databaseNameLengthHex
+ databaseNameHex;
+ return 'j' + jobId.getJobType().getCode() + jobId.CURRENT_VERSION +
encodedInstanceType + databaseNameLengthHex + databaseNameHex;
}
/**
@@ -84,7 +84,7 @@ public final class PipelineJobIdUtils {
public static PipelineContextKey parseContextKey(final String jobId) {
verifyJobId(jobId);
String formatVersion = jobId.substring(3, 5);
-
Preconditions.checkArgument(AbstractPipelineJobId.CURRENT_VERSION.equals(formatVersion),
"Format version doesn't match, format version: " + formatVersion);
+
Preconditions.checkArgument(PipelineJobId.CURRENT_VERSION.equals(formatVersion),
"Format version doesn't match, format version: " + formatVersion);
char instanceType = jobId.charAt(5);
short databaseNameLength =
Shorts.fromByteArray(Hex.decodeHex(jobId.substring(6, 10)));
String databaseName = new String(Hex.decodeHex(jobId.substring(10, 10
+ databaseNameLength)), StandardCharsets.UTF_8);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
index 14c68f8480a..868a2ccce40 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobId.java
@@ -19,10 +19,12 @@ package org.apache.shardingsphere.data.pipeline.cdc;
import com.google.common.base.Joiner;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.commons.codec.digest.DigestUtils;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import java.nio.charset.StandardCharsets;
@@ -31,9 +33,15 @@ import java.util.List;
/**
* CDC job id.
*/
-@Getter
+@RequiredArgsConstructor
@ToString(callSuper = true)
-public final class CDCJobId extends AbstractPipelineJobId {
+public final class CDCJobId implements PipelineJobId {
+
+ @Getter
+ private final PipelineJobType jobType = new CDCJobType();
+
+ @Getter
+ private final PipelineContextKey contextKey;
private final List<String> schemaTableNames;
@@ -41,15 +49,8 @@ public final class CDCJobId extends AbstractPipelineJobId {
private final String sinkType;
- public CDCJobId(final PipelineContextKey contextKey, final List<String>
schemaTableNames, final boolean full, final String sinkType) {
- super(new CDCJobType(), contextKey);
- this.schemaTableNames = schemaTableNames;
- this.full = full;
- this.sinkType = sinkType;
- }
-
@Override
public String marshal() {
- return PipelineJobIdUtils.marshalJobIdCommonPrefix(this) +
DigestUtils.md5Hex(Joiner.on('|').join(getContextKey().getDatabaseName(),
schemaTableNames, full).getBytes(StandardCharsets.UTF_8));
+ return PipelineJobIdUtils.marshalPrefix(this) +
DigestUtils.md5Hex(Joiner.on('|').join(getContextKey().getDatabaseName(),
schemaTableNames, full).getBytes(StandardCharsets.UTF_8));
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
index ede3d20e103..741a26a35e7 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobIdTest.java
@@ -35,7 +35,7 @@ class CDCJobIdTest {
void assertParseJobType() {
PipelineContextKey contextKey = new PipelineContextKey("sharding_db",
InstanceType.PROXY);
CDCJobId pipelineJobId = new CDCJobId(contextKey,
Arrays.asList("test", "t_order"), false, CDCSinkType.SOCKET.name());
- String jobId =
PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
+ String jobId = PipelineJobIdUtils.marshalPrefix(pipelineJobId) +
"abcd";
PipelineJobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
assertThat(actualJobType, instanceOf(CDCJobType.class));
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index b9fbfda2c41..3f6e7da8e4e 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -20,16 +20,22 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import lombok.Getter;
import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
/**
* Consistency check job id.
*/
-@Getter
@ToString(callSuper = true)
-public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
+public final class ConsistencyCheckJobId implements PipelineJobId {
+
+ @Getter
+ private final PipelineJobType jobType = new ConsistencyCheckJobType();
+
+ @Getter
+ private final PipelineContextKey contextKey;
private final String parentJobId;
@@ -44,7 +50,7 @@ public final class ConsistencyCheckJobId extends
AbstractPipelineJobId {
}
public ConsistencyCheckJobId(final PipelineContextKey contextKey, final
String parentJobId, final int sequence) {
- super(new ConsistencyCheckJobType(), contextKey);
+ this.contextKey = contextKey;
this.parentJobId = parentJobId;
this.sequence = sequence > ConsistencyCheckSequence.MAX_SEQUENCE ?
ConsistencyCheckSequence.MIN_SEQUENCE : sequence;
}
@@ -61,6 +67,6 @@ public final class ConsistencyCheckJobId extends
AbstractPipelineJobId {
@Override
public String marshal() {
- return PipelineJobIdUtils.marshalJobIdCommonPrefix(this) + parentJobId
+ sequence;
+ return PipelineJobIdUtils.marshalPrefix(this) + parentJobId + sequence;
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
index 8b925d7b722..97777493790 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
@@ -18,10 +18,12 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.commons.codec.digest.DigestUtils;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.infra.util.json.JsonUtils;
@@ -31,19 +33,20 @@ import java.util.List;
/**
* Migration job id.
*/
-@Getter
+@RequiredArgsConstructor
@ToString(callSuper = true)
-public final class MigrationJobId extends AbstractPipelineJobId {
+public final class MigrationJobId implements PipelineJobId {
- private final List<String> jobShardingDataNodes;
+ @Getter
+ private final PipelineJobType jobType = new MigrationJobType();
- public MigrationJobId(final PipelineContextKey contextKey, final
List<String> jobShardingDataNodes) {
- super(new MigrationJobType(), contextKey);
- this.jobShardingDataNodes = jobShardingDataNodes;
- }
+ @Getter
+ private final PipelineContextKey contextKey;
+
+ private final List<String> jobShardingDataNodes;
@Override
public String marshal() {
- return PipelineJobIdUtils.marshalJobIdCommonPrefix(this) +
DigestUtils.md5Hex(JsonUtils.toJsonString(this).getBytes(StandardCharsets.UTF_8));
+ return PipelineJobIdUtils.marshalPrefix(this) +
DigestUtils.md5Hex(JsonUtils.toJsonString(this).getBytes(StandardCharsets.UTF_8));
}
}