This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 92dbc4bcce2 Move AbstractPipelineJobAPIImpl.convertJobConfiguration
and swapToYamlJobConfiguration to PipelineJobConfiguration (#29030)
92dbc4bcce2 is described below
commit 92dbc4bcce27671c4dd201b4de678a6c7956e442
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 13 23:34:36 2023 +0800
Move AbstractPipelineJobAPIImpl.convertJobConfiguration and
swapToYamlJobConfiguration to PipelineJobConfiguration (#29030)
* Use PipelineJobAPI.getPipelineJobClass() to instead of
AbstractPipelineJobAPIImpl.getJobClassName()
* Move AbstractPipelineJobAPIImpl.convertJobConfiguration and
swapToYamlJobConfiguration to PipelineJobConfiguration
---
.../config/job/PipelineJobConfiguration.java | 35 ++++++++++++++++++++++
.../service/impl/AbstractPipelineJobAPIImpl.java | 20 +------------
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 14 +--------
.../cdc/config/job/CDCJobConfiguration.java | 15 ++++++++++
.../api/impl/ConsistencyCheckJobAPI.java | 5 ----
.../config/ConsistencyCheckJobConfiguration.java | 12 ++++----
.../migration/api/impl/MigrationJobAPI.java | 5 ----
.../config/MigrationJobConfiguration.java | 7 +++++
8 files changed, 66 insertions(+), 47 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
index e6f6516b7f4..511c10d2fb9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
@@ -17,13 +17,23 @@
package org.apache.shardingsphere.data.pipeline.common.config.job;
+import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Collections;
/**
* Pipeline job configuration.
*/
public interface PipelineJobConfiguration {
+ DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
/**
* Get job id.
*
@@ -44,4 +54,29 @@ public interface PipelineJobConfiguration {
* @return source database type
*/
DatabaseType getSourceDatabaseType();
+
+ /**
+ * Convert to job configuration POJO.
+ *
+ * @return converted job configuration POJO
+ */
+ default JobConfigurationPOJO convertToJobConfigurationPOJO() {
+ JobConfigurationPOJO result = new JobConfigurationPOJO();
+ result.setJobName(getJobId());
+ result.setShardingTotalCount(getJobShardingCount());
+
result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration()));
+ String createTimeFormat =
LocalDateTime.now().format(DATE_TIME_FORMATTER);
+ result.getProps().setProperty("create_time", createTimeFormat);
+ result.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
+ result.getProps().setProperty("run_count", "1");
+
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
+ return result;
+ }
+
+ /**
+ * Swap to YAML pipeline job configuration.
+ *
+ * @return swapped YAML pipeline job configuration
+ */
+ YamlPipelineJobConfiguration swapToYamlJobConfiguration();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index fa8a1f78e25..f2ef4ea1754 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -20,9 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.job.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
@@ -38,7 +36,6 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
-import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -61,25 +58,10 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
return Optional.of(jobId);
}
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId),
getPipelineJobClass().getName());
- repositoryAPI.persist(jobConfigKey,
YamlEngine.marshal(convertJobConfiguration(jobConfig)));
+ repositoryAPI.persist(jobConfigKey,
YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
return Optional.of(jobId);
}
- protected JobConfigurationPOJO convertJobConfiguration(final
PipelineJobConfiguration jobConfig) {
- JobConfigurationPOJO result = new JobConfigurationPOJO();
- result.setJobName(jobConfig.getJobId());
- result.setShardingTotalCount(jobConfig.getJobShardingCount());
-
result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration(jobConfig)));
- String createTimeFormat =
LocalDateTime.now().format(DATE_TIME_FORMATTER);
- result.getProps().setProperty("create_time", createTimeFormat);
- result.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
- result.getProps().setProperty("run_count", "1");
-
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
- return result;
- }
-
- protected abstract YamlPipelineJobConfiguration
swapToYamlJobConfiguration(PipelineJobConfiguration jobConfig);
-
protected abstract PipelineJobConfiguration
getJobConfiguration(JobConfigurationPOJO jobConfigPOJO);
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 9c99ae5f096..847987eb0b1 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -123,7 +123,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
log.warn("CDC job already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
} else {
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
getPipelineJobClass().getName());
- JobConfigurationPOJO jobConfigPOJO =
convertJobConfiguration(jobConfig);
+ JobConfigurationPOJO jobConfigPOJO =
jobConfig.convertToJobConfigurationPOJO();
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(jobConfigKey,
YamlEngine.marshal(jobConfigPOJO));
if (!param.isFull()) {
@@ -193,13 +193,6 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
return result;
}
- @Override
- protected JobConfigurationPOJO convertJobConfiguration(final
PipelineJobConfiguration jobConfig) {
- JobConfigurationPOJO result = super.convertJobConfiguration(jobConfig);
- result.setShardingTotalCount(1);
- return result;
- }
-
/**
* Start job.
*
@@ -294,11 +287,6 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
return new
YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}
- @Override
- protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final
PipelineJobConfiguration jobConfig) {
- return new
YamlCDCJobConfigurationSwapper().swapToYamlConfiguration((CDCJobConfiguration)
jobConfig);
- }
-
@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index 8e0aa41737f..c15c38e6243 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -21,8 +21,11 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
+import
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import java.util.List;
@@ -64,6 +67,18 @@ public final class CDCJobConfiguration implements
PipelineJobConfiguration {
return jobShardingDataNodes.size();
}
+ @Override
+ public JobConfigurationPOJO convertToJobConfigurationPOJO() {
+ JobConfigurationPOJO result =
PipelineJobConfiguration.super.convertToJobConfigurationPOJO();
+ result.setShardingTotalCount(1);
+ return result;
+ }
+
+ @Override
+ public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
+ return new
YamlCDCJobConfigurationSwapper().swapToYamlConfiguration(this);
+ }
+
@RequiredArgsConstructor
@Getter
public static class SinkConfiguration {
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 58d4051ea08..f267b56aced 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -365,11 +365,6 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
return new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}
- @Override
- protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final
PipelineJobConfiguration jobConfig) {
- return new
YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration((ConsistencyCheckJobConfiguration)
jobConfig);
- }
-
@Override
public void extendYamlJobConfiguration(final PipelineContextKey
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
index a969417d96f..578eeeed0e0 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
@@ -21,6 +21,8 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import java.util.Properties;
@@ -43,13 +45,13 @@ public final class ConsistencyCheckJobConfiguration
implements PipelineJobConfig
private final DatabaseType sourceDatabaseType;
- /**
- * Get job sharding count.
- *
- * @return job sharding count
- */
@Override
public int getJobShardingCount() {
return 1;
}
+
+ @Override
+ public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
+ return new
YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration(this);
+ }
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 4a6a2b2c37c..ce2e5116377 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -226,11 +226,6 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
}
}
- @Override
- protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final
PipelineJobConfiguration jobConfig) {
- return new
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration((MigrationJobConfiguration)
jobConfig);
- }
-
@Override
public MigrationJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(getElasticJobConfigPOJO(jobId));
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
index 9ef3c648b07..f45a30d512c 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
@@ -22,7 +22,9 @@ import lombok.RequiredArgsConstructor;
import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import java.util.List;
@@ -67,4 +69,9 @@ public final class MigrationJobConfiguration implements
PipelineJobConfiguration
public int getJobShardingCount() {
return jobShardingDataNodes.size();
}
+
+ @Override
+ public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
+ return new
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(this);
+ }
}