This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bebe441d28f Refactor job id formatting and yaml job configuration 
extension for common usage (#20064)
bebe441d28f is described below

commit bebe441d28f208f3aa51564c57380c5fc464f071
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Aug 11 09:26:55 2022 +0800

    Refactor job id formatting and yaml job configuration extension for common 
usage (#20064)
    
    * Rename JobId to PipelineJobId
    
    * Refactor JobType
    
    * Remove JobSubType
    
    * Extract job id marshal & parse and extendJobConfiguration to 
PipelineJobAPI
    
    * Fix
    
    * Update job config path pattern
---
 .../data/pipeline/api/PipelineJobAPI.java          | 27 ++++++++++
 .../job/YamlPipelineJobConfiguration.java}         | 30 ++++++------
 .../yaml/YamlRuleAlteredJobConfiguration.java      | 44 +----------------
 ...stractJobId.java => AbstractPipelineJobId.java} | 11 ++---
 .../data/pipeline/api/job/JobType.java             | 33 +++++++++++--
 .../api/job/{JobId.java => PipelineJobId.java}     | 13 +----
 .../data/pipeline/api/job/RuleAlteredJobId.java    | 55 +--------------------
 .../api/fixture/RuleAlteredJobAPIFixture.java      | 17 +++++++
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  | 21 ++++++++
 .../core/api/impl/RuleAlteredJobAPIImpl.java       | 47 ++++++++++++++++++
 .../pipeline/core/execute/PipelineJobExecutor.java |  2 +-
 .../scenario/rulealtered/RuleAlteredJobWorker.java |  3 +-
 .../pipeline/core/job/RuleAlteredJobIdTest.java    | 57 ----------------------
 .../core/util/JobConfigurationBuilder.java         | 24 ++-------
 14 files changed, 173 insertions(+), 211 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
index 124135e584b..7388cf15821 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
@@ -17,11 +17,38 @@
 
 package org.apache.shardingsphere.data.pipeline.api;
 
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+
 /**
  * Pipeline job API.
  */
 public interface PipelineJobAPI {
     
+    /**
+     * Marshal pipeline job id.
+     *
+     * @param pipelineJobId pipeline job id
+     * @return marshaled text
+     */
+    String marshalJobId(PipelineJobId pipelineJobId);
+    
+    /**
+     * Parse job type.
+     *
+     * @param jobId job id
+     * @return job type
+     */
+    JobType parseJobType(String jobId);
+    
+    /**
+     * Extend job configuration.
+     *
+     * @param yamlJobConfig yaml job configuration
+     */
+    void extendJobConfiguration(YamlPipelineJobConfiguration yamlJobConfig);
+    
     /**
      * Start pipeline job by id.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobSubType.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/YamlPipelineJobConfiguration.java
similarity index 65%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobSubType.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/YamlPipelineJobConfiguration.java
index 800fde579e5..1a39b7c854c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobSubType.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/YamlPipelineJobConfiguration.java
@@ -15,24 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.api.config.job;
 
-import com.google.common.base.Preconditions;
-import lombok.Getter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
 /**
- * Job sub-type.
+ * YAML pipeline job configuration.
  */
-@Getter
-public enum JobSubType {
+public interface YamlPipelineJobConfiguration extends YamlConfiguration {
     
-    SCALING("01"),
-    ENCRYPTION("02");
+    /**
+     * Get job id.
+     *
+     * @return job id
+     */
+    String getJobId();
     
-    private final String value;
-    
-    JobSubType(final String value) {
-        Preconditions.checkArgument(value.length() == 2, "value length is not 
2");
-        this.value = value;
-    }
+    /**
+     * Get database name.
+     *
+     * @return database name
+     */
+    String getDatabaseName();
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
index 99f3f432e30..39840c6a08d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
@@ -18,20 +18,12 @@
 package org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
-import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -41,7 +33,7 @@ import java.util.Map;
 @Getter
 @Setter
 @Slf4j
-public final class YamlRuleAlteredJobConfiguration implements 
YamlConfiguration {
+public final class YamlRuleAlteredJobConfiguration implements 
YamlPipelineJobConfiguration {
     
     private String jobId;
     
@@ -112,38 +104,6 @@ public final class YamlRuleAlteredJobConfiguration 
implements YamlConfiguration
         Preconditions.checkNotNull(yamlConfig.getParameter());
     }
     
-    /**
-     * Extend configuration.
-     */
-    public void extendConfiguration() {
-        if (null == getJobShardingDataNodes()) {
-            
RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(this);
-        }
-        if (null == jobId) {
-            jobId = generateJobId();
-        }
-        if (Strings.isNullOrEmpty(getSourceDatabaseType())) {
-            PipelineDataSourceConfiguration sourceDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(source.getType(), 
source.getParameter());
-            
setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
-        }
-        if (Strings.isNullOrEmpty(getTargetDatabaseType())) {
-            PipelineDataSourceConfiguration targetDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(target.getType(), 
target.getParameter());
-            
setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getType());
-        }
-    }
-    
-    private String generateJobId() {
-        RuleAlteredJobId jobId = new RuleAlteredJobId();
-        // TODO type, subTypes
-        jobId.setType(JobType.RULE_ALTERED.getValue());
-        jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
-        
jobId.setSubTypes(Collections.singletonList(JobSubType.SCALING.getValue()));
-        jobId.setCurrentMetadataVersion(activeVersion);
-        jobId.setNewMetadataVersion(newVersion);
-        jobId.setDatabaseName(databaseName);
-        return jobId.marshal();
-    }
-    
     @Override
     public String toString() {
         return "YamlRuleAlteredJobConfiguration{"
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractJobId.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractPipelineJobId.java
similarity index 86%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractJobId.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractPipelineJobId.java
index fd754bef215..2b460333ecb 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractJobId.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/AbstractPipelineJobId.java
@@ -22,25 +22,20 @@ import lombok.NonNull;
 import lombok.Setter;
 import lombok.ToString;
 
-import java.util.List;
-
 /**
- * Abstract job id.
+ * Abstract pipeline job id.
  */
 @Getter
 @Setter
 @ToString
-public abstract class AbstractJobId implements JobId {
+public abstract class AbstractPipelineJobId implements PipelineJobId {
     
     @NonNull
-    private String type;
+    private String typeCode;
     
     @NonNull
     private String formatVersion;
     
-    @NonNull
-    private List<String> subTypes;
-    
     @NonNull
     private String databaseName;
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
index fc26dfe3619..57118ef052c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
@@ -20,18 +20,41 @@ package org.apache.shardingsphere.data.pipeline.api.job;
 import com.google.common.base.Preconditions;
 import lombok.Getter;
 
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * Job type.
  */
 @Getter
 public enum JobType {
     
-    RULE_ALTERED("01");
+    MIGRATION("MIGRATION", "01");
+    
+    private static final Map<String, JobType> CODE_JOB_TYPE_MAP;
+    
+    static {
+        CODE_JOB_TYPE_MAP = 
Arrays.stream(JobType.values()).collect(Collectors.toMap(JobType::getTypeCode, 
each -> each));
+    }
     
-    private final String value;
+    private final String typeName;
+    
+    private final String typeCode;
+    
+    JobType(final String typeName, final String typeCode) {
+        this.typeName = typeName;
+        Preconditions.checkArgument(typeCode.length() == 2, "code length is 
not 2");
+        this.typeCode = typeCode;
+    }
     
-    JobType(final String value) {
-        Preconditions.checkArgument(value.length() == 2, "value length is not 
2");
-        this.value = value;
+    /**
+     * Value of by code.
+     *
+     * @param typeCode type code
+     * @return job type
+     */
+    public static JobType valueOfByCode(final String typeCode) {
+        return CODE_JOB_TYPE_MAP.get(typeCode);
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobId.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
similarity index 86%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobId.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
index ea090e5d445..77749a0cf6d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobId.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
@@ -17,19 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.api.job;
 
-import java.util.List;
-
 /**
  * Job id.
  */
-public interface JobId {
+public interface PipelineJobId {
     
     /**
      * Get type.
      *
      * @return type
      */
-    String getType();
+    String getTypeCode();
     
     /**
      * Get format version.
@@ -38,13 +36,6 @@ public interface JobId {
      */
     String getFormatVersion();
     
-    /**
-     * Get sub-types.
-     *
-     * @return sub-types
-     */
-    List<String> getSubTypes();
-    
     /**
      * Get database name.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/RuleAlteredJobId.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/RuleAlteredJobId.java
index 626a34db871..f21780e26c3 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/RuleAlteredJobId.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/RuleAlteredJobId.java
@@ -17,20 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.api.job;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
 import lombok.Getter;
 import lombok.NonNull;
 import lombok.Setter;
-import lombok.SneakyThrows;
 import lombok.ToString;
-import org.apache.commons.codec.DecoderException;
-import org.apache.commons.codec.binary.Hex;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * Job id.
@@ -38,8 +28,8 @@ import java.util.stream.Collectors;
 @Getter
 @Setter
 @ToString(callSuper = true)
-// TODO refactor as SPI
-public final class RuleAlteredJobId extends AbstractJobId {
+// TODO rename and change fields
+public final class RuleAlteredJobId extends AbstractPipelineJobId {
     
     public static final String CURRENT_VERSION = "01";
     
@@ -48,45 +38,4 @@ public final class RuleAlteredJobId extends AbstractJobId {
     
     @NonNull
     private Integer newMetadataVersion;
-    
-    /**
-     * Marshal job id.
-     *
-     * @return job id text. Format: {type} + 
hex({formatVersion}|{sortedSubTypes}|{currentMetadataVersion}T{newMetadataVersion}|{databaseName})
-     */
-    public String marshal() {
-        List<String> subTypes = getSubTypes();
-        Collections.sort(subTypes);
-        String text = getFormatVersion() + "|" + String.join("-", subTypes) + 
"|" + getCurrentMetadataVersion() + "T" + getNewMetadataVersion() + "|" + 
getDatabaseName();
-        return getType() + 
Hex.encodeHexString(text.getBytes(StandardCharsets.UTF_8), true);
-    }
-    
-    /**
-     * Unmarshal from hex text.
-     *
-     * @param hexText hex text
-     * @return job id object
-     */
-    @SneakyThrows(DecoderException.class)
-    public static RuleAlteredJobId unmarshal(final String hexText) {
-        if (hexText.length() <= 2) {
-            throw new IllegalArgumentException("Invalid hex text length, 
hexText=" + hexText);
-        }
-        String type = hexText.substring(0, 2);
-        String text = new String(Hex.decodeHex(hexText.substring(2)), 
StandardCharsets.UTF_8);
-        List<String> splittedText = Splitter.on('|').splitToList(text);
-        String formatVersion = splittedText.get(0);
-        Preconditions.checkState("01".equals(formatVersion), "Unknown 
formatVersion=" + formatVersion);
-        List<String> subTypes = 
Splitter.on('-').splitToList(splittedText.get(1));
-        List<Integer> metadataVersions = 
Splitter.on('T').splitToList(splittedText.get(2)).stream().map(Integer::parseInt).collect(Collectors.toList());
-        String databaseName = splittedText.get(3);
-        RuleAlteredJobId result = new RuleAlteredJobId();
-        result.setType(type);
-        result.setFormatVersion(formatVersion);
-        result.setSubTypes(subTypes);
-        result.setCurrentMetadataVersion(metadataVersions.get(0));
-        result.setNewMetadataVersion(metadataVersions.get(1));
-        result.setDatabaseName(databaseName);
-        return result;
-    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
index 2e9817ab54a..6fec99f19cf 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
@@ -19,9 +19,12 @@ package org.apache.shardingsphere.data.pipeline.api.fixture;
 
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
@@ -34,6 +37,20 @@ import java.util.Properties;
 
 public final class RuleAlteredJobAPIFixture implements RuleAlteredJobAPI {
     
+    @Override
+    public String marshalJobId(final PipelineJobId pipelineJobId) {
+        return null;
+    }
+    
+    @Override
+    public JobType parseJobType(final String jobId) {
+        return null;
+    }
+    
+    @Override
+    public void extendJobConfiguration(final YamlPipelineJobConfiguration 
yamlJobConfig) {
+    }
+    
     @Override
     public void startDisabledJob(final String jobId) {
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 406037d25a3..9f496fa20e2 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
@@ -35,6 +37,25 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     protected static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     
+    @Override
+    public final String marshalJobId(final PipelineJobId pipelineJobId) {
+        return 'j' + pipelineJobId.getTypeCode() + 
marshalJobIdLeftPart(pipelineJobId);
+    }
+    
+    protected abstract String marshalJobIdLeftPart(PipelineJobId 
pipelineJobId);
+    
+    @Override
+    public JobType parseJobType(final String jobId) {
+        if (jobId.length() <= 3) {
+            throw new IllegalArgumentException("Invalid jobId length, jobId=" 
+ jobId);
+        }
+        if ('j' == jobId.charAt(0)) {
+            throw new IllegalArgumentException("Invalid jobId, first char=" + 
jobId.charAt(0));
+        }
+        String typeCode = jobId.substring(1, 3);
+        return JobType.valueOfByCode(typeCode);
+    }
+    
     @Override
     public void startDisabledJob(final String jobId) {
         log.info("Start disabled pipeline job {}", jobId);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index a952c9b1eb3..590abafa388 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -18,14 +18,23 @@
 package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
@@ -52,6 +61,7 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJ
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -61,6 +71,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
 
+import java.nio.charset.StandardCharsets;
 import java.time.LocalDateTime;
 import java.util.Collection;
 import java.util.Collections;
@@ -83,6 +94,42 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
     
     private static final YamlJobProgressSwapper SWAPPER = new 
YamlJobProgressSwapper();
     
+    @Override
+    protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
+        RuleAlteredJobId jobId = (RuleAlteredJobId) pipelineJobId;
+        String text = jobId.getFormatVersion() + "|" + 
jobId.getCurrentMetadataVersion() + "T" + jobId.getNewMetadataVersion() + "|" + 
jobId.getDatabaseName();
+        return Hex.encodeHexString(text.getBytes(StandardCharsets.UTF_8), 
true);
+    }
+    
+    @Override
+    public void extendJobConfiguration(final YamlPipelineJobConfiguration 
yamlJobConfig) {
+        YamlRuleAlteredJobConfiguration config = 
(YamlRuleAlteredJobConfiguration) yamlJobConfig;
+        if (null == config.getJobShardingDataNodes()) {
+            
RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(config);
+        }
+        if (null == yamlJobConfig.getJobId()) {
+            config.setJobId(generateJobId(config));
+        }
+        if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
+            PipelineDataSourceConfiguration sourceDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(config.getSource().getType(),
 config.getSource().getParameter());
+            
config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
+        }
+        if (Strings.isNullOrEmpty(config.getTargetDatabaseType())) {
+            PipelineDataSourceConfiguration targetDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(config.getTarget().getType(),
 config.getTarget().getParameter());
+            
config.setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getType());
+        }
+    }
+    
+    private String generateJobId(final YamlRuleAlteredJobConfiguration config) 
{
+        RuleAlteredJobId jobId = new RuleAlteredJobId();
+        jobId.setTypeCode(JobType.MIGRATION.getTypeCode());
+        jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
+        jobId.setCurrentMetadataVersion(config.getActiveVersion());
+        jobId.setNewMetadataVersion(config.getNewVersion());
+        jobId.setDatabaseName(config.getDatabaseName());
+        return marshalJobId(jobId);
+    }
+    
     @Override
     public List<JobInfo> list() {
         checkModeConfig();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 84a6e39b618..e29231662db 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -44,7 +44,7 @@ import java.util.regex.Pattern;
 @Slf4j
 public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
     
-    private static final Pattern CONFIG_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(\\d{2}[0-9a-f]+)/config");
+    private static final Pattern CONFIG_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(j\\d{2}[0-9a-f]+)/config");
     
     private final ExecutorService executor = Executors.newFixedThreadPool(20);
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index c7e8633ab8e..52e0b4cf91e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Shardi
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
+import 
org.apache.shardingsphere.data.pipeline.core.api.impl.RuleAlteredJobAPIImpl;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import 
org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
@@ -204,7 +205,7 @@ public final class RuleAlteredJobWorker {
         result.setNewVersion(event.getNewVersion());
         
result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
         
result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
-        result.extendConfiguration();
+        new RuleAlteredJobAPIImpl().extendJobConfiguration(result);
         return Optional.of(new 
RuleAlteredJobConfigurationSwapper().swapToObject(result));
     }
     
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/RuleAlteredJobIdTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/RuleAlteredJobIdTest.java
deleted file mode 100644
index bdd1e99d59f..00000000000
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/RuleAlteredJobIdTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public final class RuleAlteredJobIdTest {
-    
-    @Test
-    public void assertSerialization() {
-        RuleAlteredJobId jobId = new RuleAlteredJobId();
-        jobId.setType(JobType.RULE_ALTERED.getValue());
-        jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
-        Pair<List<String>, List<String>> subTypesPair = getSubTypesPair();
-        jobId.setSubTypes(subTypesPair.getLeft());
-        jobId.setCurrentMetadataVersion(0);
-        jobId.setNewMetadataVersion(1);
-        jobId.setDatabaseName("sharding_db");
-        String hexText = jobId.marshal();
-        RuleAlteredJobId actual = RuleAlteredJobId.unmarshal(hexText);
-        assertThat(actual.getFormatVersion(), is(jobId.getFormatVersion()));
-        assertThat(actual.getSubTypes(), is(subTypesPair.getRight()));
-        assertThat(actual.getCurrentMetadataVersion(), 
is(jobId.getCurrentMetadataVersion()));
-        assertThat(actual.getNewMetadataVersion(), 
is(jobId.getNewMetadataVersion()));
-        assertThat(actual.getDatabaseName(), is(jobId.getDatabaseName()));
-    }
-    
-    private Pair<List<String>, List<String>> getSubTypesPair() {
-        return Pair.of(Arrays.asList(JobSubType.ENCRYPTION.getValue(), 
JobSubType.SCALING.getValue()),
-                Arrays.asList(JobSubType.SCALING.getValue(), 
JobSubType.ENCRYPTION.getValue()));
-    }
-}
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 1900339730a..7fa35b89273 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -26,9 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
+import 
org.apache.shardingsphere.data.pipeline.core.api.impl.RuleAlteredJobAPIImpl;
 import 
org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
 
 import java.util.Collections;
@@ -49,15 +47,14 @@ public final class JobConfigurationBuilder {
         YamlRuleAlteredJobConfiguration result = new 
YamlRuleAlteredJobConfiguration();
         result.setDatabaseName("logic_db");
         
result.setAlteredRuleYamlClassNameTablesMap(Collections.singletonMap(YamlShardingRuleConfiguration.class.getName(),
 Collections.singletonList("t_order")));
-        result.setActiveVersion(0);
-        result.setNewVersion(1);
+        int activeVersion = 
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 10) + 1;
+        result.setActiveVersion(activeVersion);
+        result.setNewVersion(activeVersion + 1);
         // TODO add autoTables in config file
         result.setSource(createYamlPipelineDataSourceConfiguration(
                 new 
ShardingSpherePipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"))));
         result.setTarget(createYamlPipelineDataSourceConfiguration(new 
StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
-        result.extendConfiguration();
-        int activeVersion = 
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 10) + 1;
-        result.setJobId(generateJobId(activeVersion, "logic_db"));
+        new RuleAlteredJobAPIImpl().extendJobConfiguration(result);
         return new RuleAlteredJobConfigurationSwapper().swapToObject(result);
     }
     
@@ -67,15 +64,4 @@ public final class JobConfigurationBuilder {
         result.setParameter(config.getParameter());
         return result;
     }
-    
-    private static String generateJobId(final int activeVersion, final String 
databaseName) {
-        RuleAlteredJobId jobId = new RuleAlteredJobId();
-        jobId.setType(JobType.RULE_ALTERED.getValue());
-        jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
-        
jobId.setSubTypes(Collections.singletonList(JobSubType.SCALING.getValue()));
-        jobId.setCurrentMetadataVersion(activeVersion);
-        jobId.setNewMetadataVersion(activeVersion + 1);
-        jobId.setDatabaseName(databaseName);
-        return jobId.marshal();
-    }
 }

Reply via email to