This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 13bbf0528d9 Add PipelineJobIdUtils.getElasticJobConfigurationPOJO() 
(#29036)
13bbf0528d9 is described below

commit 13bbf0528d9ef2d312f72fc3b79eee6c7ddcd27a
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Nov 14 23:12:00 2023 +0800

    Add PipelineJobIdUtils.getElasticJobConfigurationPOJO() (#29036)
---
 .../data/pipeline/core/job/PipelineJobIdUtils.java    | 16 ++++++++++++++++
 .../pipeline/core/job/service/PipelineJobAPI.java     |  9 +++++++++
 .../impl/AbstractInventoryIncrementalJobAPIImpl.java  |  4 ++--
 .../job/service/impl/AbstractPipelineJobAPIImpl.java  | 19 ++-----------------
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java         | 12 ++++++------
 .../api/impl/ConsistencyCheckJobAPI.java              |  6 +++---
 .../scenario/migration/api/impl/MigrationJobAPI.java  |  6 +++---
 7 files changed, 41 insertions(+), 31 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index 0840073bf3d..5611ac02e1c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -29,6 +29,10 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
 import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import org.apache.shardingsphere.data.pipeline.common.util.InstanceTypeUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 
 import java.nio.charset.StandardCharsets;
@@ -86,4 +90,16 @@ public final class PipelineJobIdUtils {
         String databaseName = new String(Hex.decodeHex(jobId.substring(10, 10 
+ databaseNameLength)), StandardCharsets.UTF_8);
         return new PipelineContextKey(databaseName, 
InstanceTypeUtils.decode(instanceType));
     }
+    
+    /**
+     * Get ElasticJob configuration POJO.
+     *
+     * @param jobId job id
+     * @return ElasticJob configuration POJO
+     */
+    public static JobConfigurationPOJO getElasticJobConfigurationPOJO(final 
String jobId) {
+        JobConfigurationPOJO result = 
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobConfiguration(jobId);
+        ShardingSpherePreconditions.checkNotNull(result, () -> new 
PipelineJobNotFoundException(jobId));
+        return result;
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 9f7fd928521..44799e07817 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
 
@@ -95,6 +96,14 @@ public interface PipelineJobAPI extends TypedSPI {
      */
     PipelineJobConfiguration getJobConfiguration(String jobId);
     
+    /**
+     * Get job configuration.
+     *
+     * @param jobConfigPOJO job configuration POJO
+     * @return pipeline job configuration
+     */
+    PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO 
jobConfigPOJO);
+    
     /**
      * Get pipeline job info.
      * 
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index da4ead55272..391e3b8454e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -91,7 +91,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl 
extends AbstractPip
     @Override
     public Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(final PipelineJobConfiguration jobConfig) {
         String jobId = jobConfig.getJobId();
-        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         return IntStream.range(0, 
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, 
each) -> {
             Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
getJobItemProgress(jobId, each);
             jobItemProgress.ifPresent(optional -> 
optional.setActive(!jobConfigPOJO.isDisabled()));
@@ -101,7 +101,7 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
     
     @Override
     public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String 
jobId) {
-        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         PipelineJobConfiguration jobConfig = 
getJobConfiguration(jobConfigPOJO);
         long startTimeMillis = 
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
         Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = 
getJobProgress(jobConfig);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index f2ef4ea1754..cdff9743e88 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -26,7 +26,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.
 import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
@@ -62,13 +61,11 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
         return Optional.of(jobId);
     }
     
-    protected abstract PipelineJobConfiguration 
getJobConfiguration(JobConfigurationPOJO jobConfigPOJO);
-    
     @Override
     public void startDisabledJob(final String jobId) {
         PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
         
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
-        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () 
-> new PipelineJobHasAlreadyStartedException(jobId));
         jobConfigPOJO.setDisabled(false);
         jobConfigPOJO.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
@@ -85,7 +82,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     public void stop(final String jobId) {
         PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
         
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
-        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         if (jobConfigPOJO.isDisabled()) {
             return;
         }
@@ -104,18 +101,6 @@ public abstract class AbstractPipelineJobAPIImpl 
implements PipelineJobAPI {
         
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId);
     }
     
-    /**
-     * Get ElasticJob configuration POJO.
-     *
-     * @param jobId job id
-     * @return ElasticJob configuration POJO
-     */
-    public final JobConfigurationPOJO getElasticJobConfigPOJO(final String 
jobId) {
-        JobConfigurationPOJO result = 
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobConfiguration(jobId);
-        ShardingSpherePreconditions.checkNotNull(result, () -> new 
PipelineJobNotFoundException(jobId));
-        return result;
-    }
-    
     @Override
     public String getJobItemErrorMessage(final String jobId, final int 
shardingItem) {
         return 
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
 shardingItem)).orElse("");
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 847987eb0b1..e4134a90c0f 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -203,7 +203,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         CDCJob job = new CDCJob(jobId, sink);
         PipelineJobCenter.addJob(jobId, job);
         updateJobConfigurationDisabled(jobId, false);
-        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
 job, jobConfigPOJO.toJobConfiguration());
         job.setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
@@ -216,7 +216,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
      * @param disabled disabled
      */
     public void updateJobConfigurationDisabled(final String jobId, final 
boolean disabled) {
-        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         jobConfigPOJO.setDisabled(disabled);
         if (disabled) {
             jobConfigPOJO.getProps().setProperty("stop_time_millis", 
String.valueOf(System.currentTimeMillis()));
@@ -279,17 +279,17 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     
     @Override
     public CDCJobConfiguration getJobConfiguration(final String jobId) {
-        return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+        return 
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
     }
     
     @Override
-    protected CDCJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
+    public CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO 
jobConfigPOJO) {
         return new 
YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
     }
     
     @Override
     public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
-        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(jobConfigPOJO);
         CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
         return new TableBasedPipelineJobInfo(jobMetaData, 
jobConfig.getDatabaseName(), String.join(", ", 
jobConfig.getSchemaTableNames()));
@@ -305,7 +305,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
      * @param jobId job id
      */
     public void dropStreaming(final String jobId) {
-        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
         ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () 
-> new PipelineInternalException("Can't drop streaming job which is active"));
         dropJob(jobId);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index f267b56aced..6f825c9f17d 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -290,7 +290,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
         String checkJobId = latestCheckJobId.get();
         Optional<ConsistencyCheckJobItemProgress> progress = 
getJobItemProgress(checkJobId, 0);
         ConsistencyCheckJobItemInfo result = new ConsistencyCheckJobItemInfo();
-        JobConfigurationPOJO jobConfigPOJO = 
getElasticJobConfigPOJO(checkJobId);
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId);
         result.setActive(!jobConfigPOJO.isDisabled());
         if (!progress.isPresent()) {
             return result;
@@ -357,11 +357,11 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
     
     @Override
     public ConsistencyCheckJobConfiguration getJobConfiguration(final String 
jobId) {
-        return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+        return 
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
     }
     
     @Override
-    protected ConsistencyCheckJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
+    public ConsistencyCheckJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
         return new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index ce2e5116377..0cfed7317b4 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -210,7 +210,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     
     @Override
     public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
-        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(jobConfigPOJO);
         List<String> sourceTables = new LinkedList<>();
         
getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each -> 
each.getEntries().forEach(entry -> entry.getDataNodes()
@@ -228,11 +228,11 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     
     @Override
     public MigrationJobConfiguration getJobConfiguration(final String jobId) {
-        return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+        return 
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
     }
     
     @Override
-    protected MigrationJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
+    public MigrationJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
         return new 
YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
     }
     

Reply via email to