This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 13bbf0528d9 Add PipelineJobIdUtils.getElasticJobConfigurationPOJO()
(#29036)
13bbf0528d9 is described below
commit 13bbf0528d9ef2d312f72fc3b79eee6c7ddcd27a
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Nov 14 23:12:00 2023 +0800
Add PipelineJobIdUtils.getElasticJobConfigurationPOJO() (#29036)
---
.../data/pipeline/core/job/PipelineJobIdUtils.java | 16 ++++++++++++++++
.../pipeline/core/job/service/PipelineJobAPI.java | 9 +++++++++
.../impl/AbstractInventoryIncrementalJobAPIImpl.java | 4 ++--
.../job/service/impl/AbstractPipelineJobAPIImpl.java | 19 ++-----------------
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 12 ++++++------
.../api/impl/ConsistencyCheckJobAPI.java | 6 +++---
.../scenario/migration/api/impl/MigrationJobAPI.java | 6 +++---
7 files changed, 41 insertions(+), 31 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index 0840073bf3d..5611ac02e1c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -29,6 +29,10 @@ import
org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.util.InstanceTypeUtils;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import java.nio.charset.StandardCharsets;
@@ -86,4 +90,16 @@ public final class PipelineJobIdUtils {
String databaseName = new String(Hex.decodeHex(jobId.substring(10, 10
+ databaseNameLength)), StandardCharsets.UTF_8);
return new PipelineContextKey(databaseName,
InstanceTypeUtils.decode(instanceType));
}
+
+ /**
+ * Get ElasticJob configuration POJO.
+ *
+ * @param jobId job id
+ * @return ElasticJob configuration POJO
+ */
+ public static JobConfigurationPOJO getElasticJobConfigurationPOJO(final
String jobId) {
+ JobConfigurationPOJO result =
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobConfiguration(jobId);
+ ShardingSpherePreconditions.checkNotNull(result, () -> new
PipelineJobNotFoundException(jobId));
+ return result;
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 9f7fd928521..44799e07817 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
@@ -95,6 +96,14 @@ public interface PipelineJobAPI extends TypedSPI {
*/
PipelineJobConfiguration getJobConfiguration(String jobId);
+ /**
+ * Get job configuration.
+ *
+ * @param jobConfigPOJO job configuration POJO
+ * @return pipeline job configuration
+ */
+ PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO
jobConfigPOJO);
+
/**
* Get pipeline job info.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index da4ead55272..391e3b8454e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -91,7 +91,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl
extends AbstractPip
@Override
public Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
- JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
return IntStream.range(0,
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map,
each) -> {
Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
getJobItemProgress(jobId, each);
jobItemProgress.ifPresent(optional ->
optional.setActive(!jobConfigPOJO.isDisabled()));
@@ -101,7 +101,7 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
@Override
public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String
jobId) {
- JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobConfiguration jobConfig =
getJobConfiguration(jobConfigPOJO);
long startTimeMillis =
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
getJobProgress(jobConfig);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index f2ef4ea1754..cdff9743e88 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -26,7 +26,6 @@ import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.
import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
@@ -62,13 +61,11 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
return Optional.of(jobId);
}
- protected abstract PipelineJobConfiguration
getJobConfiguration(JobConfigurationPOJO jobConfigPOJO);
-
@Override
public void startDisabledJob(final String jobId) {
PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
- JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), ()
-> new PipelineJobHasAlreadyStartedException(jobId));
jobConfigPOJO.setDisabled(false);
jobConfigPOJO.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
@@ -85,7 +82,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
public void stop(final String jobId) {
PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
- JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
if (jobConfigPOJO.isDisabled()) {
return;
}
@@ -104,18 +101,6 @@ public abstract class AbstractPipelineJobAPIImpl
implements PipelineJobAPI {
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId);
}
- /**
- * Get ElasticJob configuration POJO.
- *
- * @param jobId job id
- * @return ElasticJob configuration POJO
- */
- public final JobConfigurationPOJO getElasticJobConfigPOJO(final String
jobId) {
- JobConfigurationPOJO result =
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobConfiguration(jobId);
- ShardingSpherePreconditions.checkNotNull(result, () -> new
PipelineJobNotFoundException(jobId));
- return result;
- }
-
@Override
public String getJobItemErrorMessage(final String jobId, final int
shardingItem) {
return
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
shardingItem)).orElse("");
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 847987eb0b1..e4134a90c0f 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -203,7 +203,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
CDCJob job = new CDCJob(jobId, sink);
PipelineJobCenter.addJob(jobId, job);
updateJobConfigurationDisabled(jobId, false);
- JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
job, jobConfigPOJO.toJobConfiguration());
job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
@@ -216,7 +216,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
* @param disabled disabled
*/
public void updateJobConfigurationDisabled(final String jobId, final
boolean disabled) {
- JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
jobConfigPOJO.setDisabled(disabled);
if (disabled) {
jobConfigPOJO.getProps().setProperty("stop_time_millis",
String.valueOf(System.currentTimeMillis()));
@@ -279,17 +279,17 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
@Override
public CDCJobConfiguration getJobConfiguration(final String jobId) {
- return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+ return
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
}
@Override
- protected CDCJobConfiguration getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
+ public CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO
jobConfigPOJO) {
return new
YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}
@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
- JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(jobConfigPOJO);
CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
return new TableBasedPipelineJobInfo(jobMetaData,
jobConfig.getDatabaseName(), String.join(", ",
jobConfig.getSchemaTableNames()));
@@ -305,7 +305,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
* @param jobId job id
*/
public void dropStreaming(final String jobId) {
- JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), ()
-> new PipelineInternalException("Can't drop streaming job which is active"));
dropJob(jobId);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index f267b56aced..6f825c9f17d 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -290,7 +290,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
String checkJobId = latestCheckJobId.get();
Optional<ConsistencyCheckJobItemProgress> progress =
getJobItemProgress(checkJobId, 0);
ConsistencyCheckJobItemInfo result = new ConsistencyCheckJobItemInfo();
- JobConfigurationPOJO jobConfigPOJO =
getElasticJobConfigPOJO(checkJobId);
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId);
result.setActive(!jobConfigPOJO.isDisabled());
if (!progress.isPresent()) {
return result;
@@ -357,11 +357,11 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
@Override
public ConsistencyCheckJobConfiguration getJobConfiguration(final String
jobId) {
- return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+ return
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
}
@Override
- protected ConsistencyCheckJobConfiguration getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
+ public ConsistencyCheckJobConfiguration getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
return new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index ce2e5116377..0cfed7317b4 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -210,7 +210,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
- JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(jobConfigPOJO);
List<String> sourceTables = new LinkedList<>();
getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each ->
each.getEntries().forEach(entry -> entry.getDataNodes()
@@ -228,11 +228,11 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
@Override
public MigrationJobConfiguration getJobConfiguration(final String jobId) {
- return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+ return
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
}
@Override
- protected MigrationJobConfiguration getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
+ public MigrationJobConfiguration getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
return new
YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}