This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bebe441d28f Refactor job id formatting and yaml job configuration
extension for common usage (#20064)
bebe441d28f is described below
commit bebe441d28f208f3aa51564c57380c5fc464f071
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Aug 11 09:26:55 2022 +0800
Refactor job id formatting and yaml job configuration extension for common
usage (#20064)
* Rename JobId to PipelineJobId
* Refactor JobType
* Remove JobSubType
* Extract job id marshal & parse and extendJobConfiguration to
PipelineJobAPI
* Fix
* Update job config path pattern
---
.../data/pipeline/api/PipelineJobAPI.java | 27 ++++++++++
.../job/YamlPipelineJobConfiguration.java} | 30 ++++++------
.../yaml/YamlRuleAlteredJobConfiguration.java | 44 +----------------
...stractJobId.java => AbstractPipelineJobId.java} | 11 ++---
.../data/pipeline/api/job/JobType.java | 33 +++++++++++--
.../api/job/{JobId.java => PipelineJobId.java} | 13 +----
.../data/pipeline/api/job/RuleAlteredJobId.java | 55 +--------------------
.../api/fixture/RuleAlteredJobAPIFixture.java | 17 +++++++
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 21 ++++++++
.../core/api/impl/RuleAlteredJobAPIImpl.java | 47 ++++++++++++++++++
.../pipeline/core/execute/PipelineJobExecutor.java | 2 +-
.../scenario/rulealtered/RuleAlteredJobWorker.java | 3 +-
.../pipeline/core/job/RuleAlteredJobIdTest.java | 57 ----------------------
.../core/util/JobConfigurationBuilder.java | 24 ++-------
14 files changed, 173 insertions(+), 211 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
index 124135e584b..7388cf15821 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
@@ -17,11 +17,38 @@
package org.apache.shardingsphere.data.pipeline.api;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+
/**
* Pipeline job API.
*/
public interface PipelineJobAPI {
+ /**
+ * Marshal pipeline job id.
+ *
+ * @param pipelineJobId pipeline job id
+ * @return marshaled text
+ */
+ String marshalJobId(PipelineJobId pipelineJobId);
+
+ /**
+ * Parse job type.
+ *
+ * @param jobId job id
+ * @return job type
+ */
+ JobType parseJobType(String jobId);
+
+ /**
+ * Extend job configuration.
+ *
+ * @param yamlJobConfig yaml job configuration
+ */
+ void extendJobConfiguration(YamlPipelineJobConfiguration yamlJobConfig);
+
/**
* Start pipeline job by id.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobSubType.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/YamlPipelineJobConfiguration.java
similarity index 65%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobSubType.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/YamlPipelineJobConfiguration.java
index 800fde579e5..1a39b7c854c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobSubType.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/YamlPipelineJobConfiguration.java
@@ -15,24 +15,26 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.api.config.job;
-import com.google.common.base.Preconditions;
-import lombok.Getter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
- * Job sub-type.
+ * YAML pipeline job configuration.
*/
-@Getter
-public enum JobSubType {
+public interface YamlPipelineJobConfiguration extends YamlConfiguration {
- SCALING("01"),
- ENCRYPTION("02");
+ /**
+ * Get job id.
+ *
+ * @return job id
+ */
+ String getJobId();
- private final String value;
-
- JobSubType(final String value) {
- Preconditions.checkArgument(value.length() == 2, "value length is not
2");
- this.value = value;
- }
+ /**
+ * Get database name.
+ *
+ * @return database name
+ */
+ String getDatabaseName();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
index 99f3f432e30..39840c6a08d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
@@ -18,20 +18,12 @@
package org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
-import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -41,7 +33,7 @@ import java.util.Map;
@Getter
@Setter
@Slf4j
-public final class YamlRuleAlteredJobConfiguration implements
YamlConfiguration {
+public final class YamlRuleAlteredJobConfiguration implements
YamlPipelineJobConfiguration {
private String jobId;
@@ -112,38 +104,6 @@ public final class YamlRuleAlteredJobConfiguration
implements YamlConfiguration
Preconditions.checkNotNull(yamlConfig.getParameter());
}
- /**
- * Extend configuration.
- */
- public void extendConfiguration() {
- if (null == getJobShardingDataNodes()) {
-
RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(this);
- }
- if (null == jobId) {
- jobId = generateJobId();
- }
- if (Strings.isNullOrEmpty(getSourceDatabaseType())) {
- PipelineDataSourceConfiguration sourceDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(source.getType(),
source.getParameter());
-
setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
- }
- if (Strings.isNullOrEmpty(getTargetDatabaseType())) {
- PipelineDataSourceConfiguration targetDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(target.getType(),
target.getParameter());
-
setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getType());
- }
- }
-
- private String generateJobId() {
- RuleAlteredJobId jobId = new RuleAlteredJobId();
- // TODO type, subTypes
- jobId.setType(JobType.RULE_ALTERED.getValue());
- jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
-
jobId.setSubTypes(Collections.singletonList(JobSubType.SCALING.getValue()));
- jobId.setCurrentMetadataVersion(activeVersion);
- jobId.setNewMetadataVersion(newVersion);
- jobId.setDatabaseName(databaseName);
- return jobId.marshal();
- }
-
@Override
public String toString() {
return "YamlRuleAlteredJobConfiguration{"
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractJobId.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractPipelineJobId.java
similarity index 86%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractJobId.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractPipelineJobId.java
index fd754bef215..2b460333ecb 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractJobId.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractPipelineJobId.java
@@ -22,25 +22,20 @@ import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
-import java.util.List;
-
/**
- * Abstract job id.
+ * Abstract pipeline job id.
*/
@Getter
@Setter
@ToString
-public abstract class AbstractJobId implements JobId {
+public abstract class AbstractPipelineJobId implements PipelineJobId {
@NonNull
- private String type;
+ private String typeCode;
@NonNull
private String formatVersion;
- @NonNull
- private List<String> subTypes;
-
@NonNull
private String databaseName;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
index fc26dfe3619..57118ef052c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
@@ -20,18 +20,41 @@ package org.apache.shardingsphere.data.pipeline.api.job;
import com.google.common.base.Preconditions;
import lombok.Getter;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+
/**
* Job type.
*/
@Getter
public enum JobType {
- RULE_ALTERED("01");
+ MIGRATION("MIGRATION", "01");
+
+ private static final Map<String, JobType> CODE_JOB_TYPE_MAP;
+
+ static {
+ CODE_JOB_TYPE_MAP =
Arrays.stream(JobType.values()).collect(Collectors.toMap(JobType::getTypeCode,
each -> each));
+ }
- private final String value;
+ private final String typeName;
+
+ private final String typeCode;
+
+ JobType(final String typeName, final String typeCode) {
+ this.typeName = typeName;
+ Preconditions.checkArgument(typeCode.length() == 2, "code length is
not 2");
+ this.typeCode = typeCode;
+ }
- JobType(final String value) {
- Preconditions.checkArgument(value.length() == 2, "value length is not
2");
- this.value = value;
+ /**
+ * Value of by code.
+ *
+ * @param typeCode type code
+ * @return job type
+ */
+ public static JobType valueOfByCode(final String typeCode) {
+ return CODE_JOB_TYPE_MAP.get(typeCode);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobId.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
similarity index 86%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobId.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
index ea090e5d445..77749a0cf6d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobId.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
@@ -17,19 +17,17 @@
package org.apache.shardingsphere.data.pipeline.api.job;
-import java.util.List;
-
/**
* Job id.
*/
-public interface JobId {
+public interface PipelineJobId {
/**
* Get type.
*
* @return type
*/
- String getType();
+ String getTypeCode();
/**
* Get format version.
@@ -38,13 +36,6 @@ public interface JobId {
*/
String getFormatVersion();
- /**
- * Get sub-types.
- *
- * @return sub-types
- */
- List<String> getSubTypes();
-
/**
* Get database name.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/RuleAlteredJobId.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/RuleAlteredJobId.java
index 626a34db871..f21780e26c3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/RuleAlteredJobId.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/RuleAlteredJobId.java
@@ -17,20 +17,10 @@
package org.apache.shardingsphere.data.pipeline.api.job;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
-import lombok.SneakyThrows;
import lombok.ToString;
-import org.apache.commons.codec.DecoderException;
-import org.apache.commons.codec.binary.Hex;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
/**
* Job id.
@@ -38,8 +28,8 @@ import java.util.stream.Collectors;
@Getter
@Setter
@ToString(callSuper = true)
-// TODO refactor as SPI
-public final class RuleAlteredJobId extends AbstractJobId {
+// TODO rename and change fields
+public final class RuleAlteredJobId extends AbstractPipelineJobId {
public static final String CURRENT_VERSION = "01";
@@ -48,45 +38,4 @@ public final class RuleAlteredJobId extends AbstractJobId {
@NonNull
private Integer newMetadataVersion;
-
- /**
- * Marshal job id.
- *
- * @return job id text. Format: {type} +
hex({formatVersion}|{sortedSubTypes}|{currentMetadataVersion}T{newMetadataVersion}|{databaseName})
- */
- public String marshal() {
- List<String> subTypes = getSubTypes();
- Collections.sort(subTypes);
- String text = getFormatVersion() + "|" + String.join("-", subTypes) +
"|" + getCurrentMetadataVersion() + "T" + getNewMetadataVersion() + "|" +
getDatabaseName();
- return getType() +
Hex.encodeHexString(text.getBytes(StandardCharsets.UTF_8), true);
- }
-
- /**
- * Unmarshal from hex text.
- *
- * @param hexText hex text
- * @return job id object
- */
- @SneakyThrows(DecoderException.class)
- public static RuleAlteredJobId unmarshal(final String hexText) {
- if (hexText.length() <= 2) {
- throw new IllegalArgumentException("Invalid hex text length,
hexText=" + hexText);
- }
- String type = hexText.substring(0, 2);
- String text = new String(Hex.decodeHex(hexText.substring(2)),
StandardCharsets.UTF_8);
- List<String> splittedText = Splitter.on('|').splitToList(text);
- String formatVersion = splittedText.get(0);
- Preconditions.checkState("01".equals(formatVersion), "Unknown
formatVersion=" + formatVersion);
- List<String> subTypes =
Splitter.on('-').splitToList(splittedText.get(1));
- List<Integer> metadataVersions =
Splitter.on('T').splitToList(splittedText.get(2)).stream().map(Integer::parseInt).collect(Collectors.toList());
- String databaseName = splittedText.get(3);
- RuleAlteredJobId result = new RuleAlteredJobId();
- result.setType(type);
- result.setFormatVersion(formatVersion);
- result.setSubTypes(subTypes);
- result.setCurrentMetadataVersion(metadataVersions.get(0));
- result.setNewMetadataVersion(metadataVersions.get(1));
- result.setDatabaseName(databaseName);
- return result;
- }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
index 2e9817ab54a..6fec99f19cf 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
@@ -19,9 +19,12 @@ package org.apache.shardingsphere.data.pipeline.api.fixture;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
@@ -34,6 +37,20 @@ import java.util.Properties;
public final class RuleAlteredJobAPIFixture implements RuleAlteredJobAPI {
+ @Override
+ public String marshalJobId(final PipelineJobId pipelineJobId) {
+ return null;
+ }
+
+ @Override
+ public JobType parseJobType(final String jobId) {
+ return null;
+ }
+
+ @Override
+ public void extendJobConfiguration(final YamlPipelineJobConfiguration
yamlJobConfig) {
+ }
+
@Override
public void startDisabledJob(final String jobId) {
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 406037d25a3..9f496fa20e2 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
@@ -35,6 +37,25 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
protected static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ @Override
+ public final String marshalJobId(final PipelineJobId pipelineJobId) {
+ return 'j' + pipelineJobId.getTypeCode() +
marshalJobIdLeftPart(pipelineJobId);
+ }
+
+ protected abstract String marshalJobIdLeftPart(PipelineJobId
pipelineJobId);
+
+ @Override
+ public JobType parseJobType(final String jobId) {
+ if (jobId.length() <= 3) {
+ throw new IllegalArgumentException("Invalid jobId length, jobId="
+ jobId);
+ }
+ if ('j' == jobId.charAt(0)) {
+ throw new IllegalArgumentException("Invalid jobId, first char=" +
jobId.charAt(0));
+ }
+ String typeCode = jobId.substring(1, 3);
+ return JobType.valueOfByCode(typeCode);
+ }
+
@Override
public void startDisabledJob(final String jobId) {
log.info("Start disabled pipeline job {}", jobId);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index a952c9b1eb3..590abafa388 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -18,14 +18,23 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
@@ -52,6 +61,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJ
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -61,6 +71,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
+import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
@@ -83,6 +94,42 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
private static final YamlJobProgressSwapper SWAPPER = new
YamlJobProgressSwapper();
+ @Override
+ protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
+ RuleAlteredJobId jobId = (RuleAlteredJobId) pipelineJobId;
+ String text = jobId.getFormatVersion() + "|" +
jobId.getCurrentMetadataVersion() + "T" + jobId.getNewMetadataVersion() + "|" +
jobId.getDatabaseName();
+ return Hex.encodeHexString(text.getBytes(StandardCharsets.UTF_8),
true);
+ }
+
+ @Override
+ public void extendJobConfiguration(final YamlPipelineJobConfiguration
yamlJobConfig) {
+ YamlRuleAlteredJobConfiguration config =
(YamlRuleAlteredJobConfiguration) yamlJobConfig;
+ if (null == config.getJobShardingDataNodes()) {
+
RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(config);
+ }
+ if (null == yamlJobConfig.getJobId()) {
+ config.setJobId(generateJobId(config));
+ }
+ if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
+ PipelineDataSourceConfiguration sourceDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(config.getSource().getType(),
config.getSource().getParameter());
+
config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
+ }
+ if (Strings.isNullOrEmpty(config.getTargetDatabaseType())) {
+ PipelineDataSourceConfiguration targetDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(config.getTarget().getType(),
config.getTarget().getParameter());
+
config.setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getType());
+ }
+ }
+
+ private String generateJobId(final YamlRuleAlteredJobConfiguration config)
{
+ RuleAlteredJobId jobId = new RuleAlteredJobId();
+ jobId.setTypeCode(JobType.MIGRATION.getTypeCode());
+ jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
+ jobId.setCurrentMetadataVersion(config.getActiveVersion());
+ jobId.setNewMetadataVersion(config.getNewVersion());
+ jobId.setDatabaseName(config.getDatabaseName());
+ return marshalJobId(jobId);
+ }
+
@Override
public List<JobInfo> list() {
checkModeConfig();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 84a6e39b618..e29231662db 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -44,7 +44,7 @@ import java.util.regex.Pattern;
@Slf4j
public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
- private static final Pattern CONFIG_PATTERN =
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT +
"/(\\d{2}[0-9a-f]+)/config");
+ private static final Pattern CONFIG_PATTERN =
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT +
"/(j\\d{2}[0-9a-f]+)/config");
private final ExecutorService executor = Executors.newFixedThreadPool(20);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index c7e8633ab8e..52e0b4cf91e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Shardi
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
+import
org.apache.shardingsphere.data.pipeline.core.api.impl.RuleAlteredJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
@@ -204,7 +205,7 @@ public final class RuleAlteredJobWorker {
result.setNewVersion(event.getNewVersion());
result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
- result.extendConfiguration();
+ new RuleAlteredJobAPIImpl().extendJobConfiguration(result);
return Optional.of(new
RuleAlteredJobConfigurationSwapper().swapToObject(result));
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/RuleAlteredJobIdTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/RuleAlteredJobIdTest.java
deleted file mode 100644
index bdd1e99d59f..00000000000
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/RuleAlteredJobIdTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public final class RuleAlteredJobIdTest {
-
- @Test
- public void assertSerialization() {
- RuleAlteredJobId jobId = new RuleAlteredJobId();
- jobId.setType(JobType.RULE_ALTERED.getValue());
- jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
- Pair<List<String>, List<String>> subTypesPair = getSubTypesPair();
- jobId.setSubTypes(subTypesPair.getLeft());
- jobId.setCurrentMetadataVersion(0);
- jobId.setNewMetadataVersion(1);
- jobId.setDatabaseName("sharding_db");
- String hexText = jobId.marshal();
- RuleAlteredJobId actual = RuleAlteredJobId.unmarshal(hexText);
- assertThat(actual.getFormatVersion(), is(jobId.getFormatVersion()));
- assertThat(actual.getSubTypes(), is(subTypesPair.getRight()));
- assertThat(actual.getCurrentMetadataVersion(),
is(jobId.getCurrentMetadataVersion()));
- assertThat(actual.getNewMetadataVersion(),
is(jobId.getNewMetadataVersion()));
- assertThat(actual.getDatabaseName(), is(jobId.getDatabaseName()));
- }
-
- private Pair<List<String>, List<String>> getSubTypesPair() {
- return Pair.of(Arrays.asList(JobSubType.ENCRYPTION.getValue(),
JobSubType.SCALING.getValue()),
- Arrays.asList(JobSubType.SCALING.getValue(),
JobSubType.ENCRYPTION.getValue()));
- }
-}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 1900339730a..7fa35b89273 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -26,9 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
+import
org.apache.shardingsphere.data.pipeline.core.api.impl.RuleAlteredJobAPIImpl;
import
org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
import java.util.Collections;
@@ -49,15 +47,14 @@ public final class JobConfigurationBuilder {
YamlRuleAlteredJobConfiguration result = new
YamlRuleAlteredJobConfiguration();
result.setDatabaseName("logic_db");
result.setAlteredRuleYamlClassNameTablesMap(Collections.singletonMap(YamlShardingRuleConfiguration.class.getName(),
Collections.singletonList("t_order")));
- result.setActiveVersion(0);
- result.setNewVersion(1);
+ int activeVersion =
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 10) + 1;
+ result.setActiveVersion(activeVersion);
+ result.setNewVersion(activeVersion + 1);
// TODO add autoTables in config file
result.setSource(createYamlPipelineDataSourceConfiguration(
new
ShardingSpherePipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"))));
result.setTarget(createYamlPipelineDataSourceConfiguration(new
StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
- result.extendConfiguration();
- int activeVersion =
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 10) + 1;
- result.setJobId(generateJobId(activeVersion, "logic_db"));
+ new RuleAlteredJobAPIImpl().extendJobConfiguration(result);
return new RuleAlteredJobConfigurationSwapper().swapToObject(result);
}
@@ -67,15 +64,4 @@ public final class JobConfigurationBuilder {
result.setParameter(config.getParameter());
return result;
}
-
- private static String generateJobId(final int activeVersion, final String
databaseName) {
- RuleAlteredJobId jobId = new RuleAlteredJobId();
- jobId.setType(JobType.RULE_ALTERED.getValue());
- jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
-
jobId.setSubTypes(Collections.singletonList(JobSubType.SCALING.getValue()));
- jobId.setCurrentMetadataVersion(activeVersion);
- jobId.setNewMetadataVersion(activeVersion + 1);
- jobId.setDatabaseName(databaseName);
- return jobId.marshal();
- }
}