This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 46305879521 Extract start job method to PipelineJobAPI for common
usage (#20126)
46305879521 is described below
commit 46305879521d0f11289c98f6afcc161722c218bd
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Aug 13 09:33:49 2022 +0800
Extract start job method to PipelineJobAPI for common usage (#20126)
* Rename getJobConfig and extendJobConfiguration
* Extract start job method to PipelineJobAPI
* Move YamlPipelineJobConfiguration
* Improve job item status persistence
---
.../data/pipeline/api/PipelineJobPublicAPI.java | 2 +-
.../{ => yaml}/YamlPipelineJobConfiguration.java | 2 +-
.../yaml/YamlRuleAlteredJobConfiguration.java | 2 +-
.../data/pipeline/core/api/PipelineJobAPI.java | 21 ++++--
.../data/pipeline/core/api/RuleAlteredJobAPI.java | 13 +---
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 40 ++++++++++-
.../core/api/impl/RuleAlteredJobAPIImpl.java | 77 +++++++---------------
.../core/task/InventoryIncrementalTasksRunner.java | 17 +++--
.../scenario/rulealtered/RuleAlteredJobWorker.java | 9 +--
.../core/fixture/RuleAlteredJobAPIFixture.java | 11 ++--
.../core/api/impl/RuleAlteredJobAPIImplTest.java | 6 +-
.../core/util/JobConfigurationBuilder.java | 5 +-
12 files changed, 114 insertions(+), 91 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
index efd9ce54bd8..dd8c9175c2f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
public interface PipelineJobPublicAPI extends TypedSPI {
/**
- * Start pipeline job by id.
+ * Start disabled job.
*
* @param jobId job id
*/
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/YamlPipelineJobConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
similarity index 94%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/YamlPipelineJobConfiguration.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
index 1a39b7c854c..c0ab0ca640b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/YamlPipelineJobConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config.job;
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
index 39840c6a08d..efc26ef8dfc 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/yaml/YamlRuleAlteredJobConfiguration.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import java.util.List;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index cdcaed2104e..adbe0df3bde 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -19,11 +19,13 @@ package org.apache.shardingsphere.data.pipeline.core.api;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import java.util.Optional;
+
/**
* Pipeline job API.
*/
@@ -39,11 +41,19 @@ public interface PipelineJobAPI extends
PipelineJobPublicAPI, PipelineJobItemAPI
String marshalJobId(PipelineJobId pipelineJobId);
/**
- * Extend job configuration.
+ * Extend YAML job configuration.
+ *
+ * @param yamlJobConfig YAML job configuration
+ */
+ void extendYamlJobConfiguration(YamlPipelineJobConfiguration
yamlJobConfig);
+
+ /**
+ * Start job.
*
- * @param yamlJobConfig yaml job configuration
+ * @param jobConfig job configuration
+ * @return job id
*/
- void extendJobConfiguration(YamlPipelineJobConfiguration yamlJobConfig);
+ Optional<String> start(PipelineJobConfiguration jobConfig);
/**
* Get job configuration.
@@ -51,6 +61,5 @@ public interface PipelineJobAPI extends PipelineJobPublicAPI,
PipelineJobItemAPI
* @param jobId job id
* @return job configuration
*/
- // TODO rename
- PipelineJobConfiguration getJobConfig(String jobId);
+ PipelineJobConfiguration getJobConfiguration(String jobId);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
index 7a8def8601d..468a0c950c7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
@@ -25,7 +25,6 @@ import
org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
import java.util.Map;
-import java.util.Optional;
/**
* Rule altered job API.
@@ -33,13 +32,8 @@ import java.util.Optional;
@SingletonSPI
public interface RuleAlteredJobAPI extends PipelineJobAPI,
MigrationJobPublicAPI, RequiredSPI {
- /**
- * Start scaling job by config.
- *
- * @param jobConfig job config
- * @return job id
- */
- Optional<String> start(RuleAlteredJobConfiguration jobConfig);
+ @Override
+ RuleAlteredJobConfiguration getJobConfiguration(String jobId);
/**
* Get job progress.
@@ -97,7 +91,4 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
MigrationJobPublicAPI
* @param jobConfig job configuration
*/
void switchClusterConfiguration(RuleAlteredJobConfiguration jobConfig);
-
- @Override
- RuleAlteredJobConfiguration getJobConfig(String jobId);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index aca2c9f609a..3da0bc5b75f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -18,16 +18,24 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.Optional;
/**
* Abstract pipeline job API impl.
@@ -44,6 +52,36 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
protected abstract String marshalJobIdLeftPart(PipelineJobId
pipelineJobId);
+ @Override
+ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
+ String jobId = jobConfig.getJobId();
+ if (0 == jobConfig.getJobShardingCount()) {
+ log.warn("Invalid job config since job sharding count is 0,
jobId={}", jobId);
+ throw new PipelineJobCreationException("job sharding count is 0,
jobId: " + jobId);
+ }
+ log.info("Start job by {}", jobConfig);
+ GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
+ String jobConfigKey =
PipelineMetaDataNode.getScalingJobConfigPath(jobId);
+ if (repositoryAPI.isExisted(jobConfigKey)) {
+ log.warn("jobId already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
+ return Optional.of(jobId);
+ }
+ repositoryAPI.persist(PipelineMetaDataNode.getScalingJobPath(jobId),
RuleAlteredJob.class.getName());
+ repositoryAPI.persist(jobConfigKey,
convertJobConfigurationToText(jobConfig));
+ return Optional.of(jobId);
+ }
+
+ private String convertJobConfigurationToText(final
PipelineJobConfiguration jobConfig) {
+ JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
+ jobConfigPOJO.setJobName(jobConfig.getJobId());
+ jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount());
+
jobConfigPOJO.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration(jobConfig)));
+ jobConfigPOJO.getProps().setProperty("create_time",
LocalDateTime.now().format(DATE_TIME_FORMATTER));
+ return YamlEngine.marshal(jobConfigPOJO);
+ }
+
+ protected abstract YamlPipelineJobConfiguration
swapToYamlJobConfiguration(PipelineJobConfiguration jobConfig);
+
@Override
public void startDisabledJob(final String jobId) {
log.info("Start disabled pipeline job {}", jobId);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 1f3576de972..60e170aaed8 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -22,7 +22,8 @@ import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
@@ -43,13 +44,10 @@ import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
@@ -58,12 +56,10 @@ import
org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
import java.nio.charset.StandardCharsets;
-import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -92,7 +88,7 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
}
@Override
- public void extendJobConfiguration(final YamlPipelineJobConfiguration
yamlJobConfig) {
+ public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration
yamlJobConfig) {
YamlRuleAlteredJobConfiguration config =
(YamlRuleAlteredJobConfiguration) yamlJobConfig;
if (null == config.getJobShardingDataNodes()) {
RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(config);
@@ -120,6 +116,20 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
return marshalJobId(jobId);
}
+ @Override
+ protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final
PipelineJobConfiguration jobConfig) {
+ return new
RuleAlteredJobConfigurationSwapper().swapToYamlConfiguration((RuleAlteredJobConfiguration)
jobConfig);
+ }
+
+ @Override
+ public RuleAlteredJobConfiguration getJobConfiguration(final String jobId)
{
+ return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+ }
+
+ private RuleAlteredJobConfiguration getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
+ return
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
+ }
+
@Override
public List<JobInfo> list() {
checkModeConfig();
@@ -139,7 +149,7 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
private JobInfo getJobInfo(final String jobName) {
JobInfo result = new JobInfo(jobName);
JobConfigurationPOJO jobConfigPOJO =
getElasticJobConfigPOJO(result.getJobId());
- RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+ RuleAlteredJobConfiguration jobConfig =
getJobConfiguration(jobConfigPOJO);
result.setActive(!jobConfigPOJO.isDisabled());
result.setShardingTotalCount(jobConfig.getJobShardingCount());
result.setTables(jobConfig.getLogicTables());
@@ -149,38 +159,10 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
return result;
}
- @Override
- public Optional<String> start(final RuleAlteredJobConfiguration jobConfig)
{
- if (0 == jobConfig.getJobShardingCount()) {
- log.warn("Invalid scaling job config!");
- throw new PipelineJobCreationException("handleConfig
shardingTotalCount is 0");
- }
- log.info("Start scaling job by {}", jobConfig);
- GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
- String jobId = jobConfig.getJobId();
- String jobConfigKey =
PipelineMetaDataNode.getScalingJobConfigPath(jobId);
- if (repositoryAPI.isExisted(jobConfigKey)) {
- log.warn("jobId already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
- return Optional.of(jobId);
- }
- repositoryAPI.persist(PipelineMetaDataNode.getScalingJobPath(jobId),
RuleAlteredJob.class.getName());
- repositoryAPI.persist(jobConfigKey, createJobConfigText(jobConfig));
- return Optional.of(jobId);
- }
-
- private String createJobConfigText(final RuleAlteredJobConfiguration
jobConfig) {
- JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
- jobConfigPOJO.setJobName(jobConfig.getJobId());
- jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount());
- jobConfigPOJO.setJobParameter(YamlEngine.marshal(new
RuleAlteredJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
- jobConfigPOJO.getProps().setProperty("create_time",
LocalDateTime.now().format(DATE_TIME_FORMATTER));
- return YamlEngine.marshal(jobConfigPOJO);
- }
-
@Override
public Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(final String jobId) {
checkModeConfig();
- return getJobProgress(getJobConfig(jobId));
+ return getJobProgress(getJobConfiguration(jobId));
}
@Override
@@ -229,7 +211,7 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
checkModeConfig();
log.info("stopClusterWriteDB for job {}", jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+ RuleAlteredJobConfiguration jobConfig =
getJobConfiguration(jobConfigPOJO);
verifyManualMode(jobConfig);
verifyJobNotStopped(jobConfigPOJO);
verifyJobNotCompleted(jobConfig);
@@ -257,7 +239,7 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
checkModeConfig();
log.info("restoreClusterWriteDB for job {}", jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+ RuleAlteredJobConfiguration jobConfig =
getJobConfiguration(jobConfigPOJO);
verifyManualMode(jobConfig);
restoreClusterWriteDB(jobConfig);
}
@@ -290,7 +272,7 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
@Override
public boolean isDataConsistencyCheckNeeded(final String jobId) {
log.info("isDataConsistencyCheckNeeded for job {}", jobId);
- RuleAlteredJobConfiguration jobConfig = getJobConfig(jobId);
+ RuleAlteredJobConfiguration jobConfig = getJobConfiguration(jobId);
return isDataConsistencyCheckNeeded(jobConfig);
}
@@ -308,7 +290,7 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
String jobId) {
checkModeConfig();
log.info("Data consistency check for job {}", jobId);
- RuleAlteredJobConfiguration jobConfig =
getJobConfig(getElasticJobConfigPOJO(jobId));
+ RuleAlteredJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
verifyDataConsistencyCheck(jobConfig);
return dataConsistencyCheck(jobConfig);
}
@@ -327,7 +309,7 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
String jobId, final String algorithmType, final Properties algorithmProps) {
checkModeConfig();
log.info("Data consistency check for job {}, algorithmType: {}",
jobId, algorithmType);
- RuleAlteredJobConfiguration jobConfig =
getJobConfig(getElasticJobConfigPOJO(jobId));
+ RuleAlteredJobConfiguration jobConfig =
getJobConfiguration(getElasticJobConfigPOJO(jobId));
verifyDataConsistencyCheck(jobConfig);
return dataConsistencyCheck(jobConfig,
DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType,
algorithmProps));
}
@@ -366,7 +348,7 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
checkModeConfig();
log.info("Switch cluster configuration for job {}", jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+ RuleAlteredJobConfiguration jobConfig =
getJobConfiguration(jobConfigPOJO);
verifyManualMode(jobConfig);
verifyJobNotStopped(jobConfigPOJO);
verifyJobNotCompleted(jobConfig);
@@ -402,15 +384,6 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
verifyJobStopped(jobConfigPOJO);
}
- @Override
- public RuleAlteredJobConfiguration getJobConfig(final String jobId) {
- return getJobConfig(getElasticJobConfigPOJO(jobId));
- }
-
- private RuleAlteredJobConfiguration getJobConfig(final
JobConfigurationPOJO jobConfigPOJO) {
- return
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
- }
-
@Override
public String getType() {
return JobType.MIGRATION.getTypeName();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 5ea0c7045b4..e9b9c1566ac 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -26,6 +26,8 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
+import
org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
@@ -39,6 +41,8 @@ import java.util.Collection;
@Slf4j
public final class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
+ private final PipelineJobItemAPI jobItemAPI = new
InventoryIncrementalJobItemAPIImpl();
+
@Getter
private final PipelineJobItemContext jobItemContext;
@@ -89,7 +93,7 @@ public final class InventoryIncrementalTasksRunner implements
PipelineTasksRunne
return true;
}
log.info("-------------- Start inventory task --------------");
- jobItemContext.setStatus(JobStatus.EXECUTE_INVENTORY_TASK);
+ updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK);
ExecuteCallback inventoryTaskCallback = createInventoryTaskCallback();
for (InventoryTask each : inventoryTasks) {
if (each.getTaskProgress().getPosition() instanceof
FinishedPosition) {
@@ -100,6 +104,11 @@ public final class InventoryIncrementalTasksRunner
implements PipelineTasksRunne
return false;
}
+ private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
+ jobItemContext.setStatus(jobStatus);
+ jobItemAPI.updateJobItemStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
+ }
+
private ExecuteCallback createInventoryTaskCallback() {
return new ExecuteCallback() {
@@ -114,7 +123,7 @@ public final class InventoryIncrementalTasksRunner
implements PipelineTasksRunne
@Override
public void onFailure(final Throwable throwable) {
log.error("Inventory task execute failed.", throwable);
-
jobItemContext.setStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
+
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
stop();
}
};
@@ -126,7 +135,7 @@ public final class InventoryIncrementalTasksRunner
implements PipelineTasksRunne
return;
}
log.info("-------------- Start incremental task --------------");
- jobItemContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
+ updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
ExecuteCallback incrementalTaskCallback =
createIncrementalTaskCallback();
for (IncrementalTask each : incrementalTasks) {
if (each.getTaskProgress().getPosition() instanceof
FinishedPosition) {
@@ -146,7 +155,7 @@ public final class InventoryIncrementalTasksRunner
implements PipelineTasksRunne
@Override
public void onFailure(final Throwable throwable) {
log.error("Incremental task execute failed.", throwable);
-
jobItemContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
+
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
stop();
}
};
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 069797aa427..0e35371c673 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
-import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
@@ -31,8 +30,10 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
-import
org.apache.shardingsphere.data.pipeline.core.api.impl.RuleAlteredJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
@@ -43,10 +44,10 @@ import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCon
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
@@ -205,7 +206,7 @@ public final class RuleAlteredJobWorker {
result.setNewVersion(event.getNewVersion());
result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
- new RuleAlteredJobAPIImpl().extendJobConfiguration(result);
+
PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
return Optional.of(new
RuleAlteredJobConfigurationSwapper().swapToObject(result));
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
index df5abb4fefa..18562308a50 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
import java.util.Collection;
import java.util.List;
@@ -42,7 +43,7 @@ public final class RuleAlteredJobAPIFixture implements
RuleAlteredJobAPI {
}
@Override
- public void extendJobConfiguration(final YamlPipelineJobConfiguration
yamlJobConfig) {
+ public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration
yamlJobConfig) {
}
@Override
@@ -63,7 +64,7 @@ public final class RuleAlteredJobAPIFixture implements
RuleAlteredJobAPI {
}
@Override
- public Optional<String> start(final RuleAlteredJobConfiguration jobConfig)
{
+ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
return Optional.empty();
}
@@ -141,7 +142,7 @@ public final class RuleAlteredJobAPIFixture implements
RuleAlteredJobAPI {
}
@Override
- public RuleAlteredJobConfiguration getJobConfig(final String jobId) {
+ public RuleAlteredJobConfiguration getJobConfiguration(final String jobId)
{
return null;
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index ccd3a70d520..9a465a59670 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -131,7 +131,7 @@ public final class RuleAlteredJobAPIImplTest {
public void assertDataConsistencyCheck() {
Optional<String> jobId =
ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- RuleAlteredJobConfiguration jobConfig =
ruleAlteredJobAPI.getJobConfig(jobId.get());
+ RuleAlteredJobConfiguration jobConfig =
ruleAlteredJobAPI.getJobConfiguration(jobId.get());
if (null == jobConfig.getSource()) {
log.error("source is null, jobConfig={}",
YamlEngine.marshal(jobConfig));
}
@@ -146,7 +146,7 @@ public final class RuleAlteredJobAPIImplTest {
public void assertDataConsistencyCheckWithAlgorithm() {
Optional<String> jobId =
ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- RuleAlteredJobConfiguration jobConfig =
ruleAlteredJobAPI.getJobConfig(jobId.get());
+ RuleAlteredJobConfiguration jobConfig =
ruleAlteredJobAPI.getJobConfiguration(jobId.get());
initTableData(jobConfig);
ruleAlteredJobAPI.stopClusterWriteDB(jobConfig);
Map<String, DataConsistencyCheckResult> checkResultMap =
ruleAlteredJobAPI.dataConsistencyCheck(jobId.get(), "FIXTURE", null);
@@ -228,7 +228,7 @@ public final class RuleAlteredJobAPIImplTest {
public void assertResetTargetTable() {
Optional<String> jobId =
ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- RuleAlteredJobConfiguration jobConfig =
ruleAlteredJobAPI.getJobConfig(jobId.get());
+ RuleAlteredJobConfiguration jobConfig =
ruleAlteredJobAPI.getJobConfiguration(jobId.get());
initTableData(jobConfig);
ruleAlteredJobAPI.stop(jobId.get());
ruleAlteredJobAPI.reset(jobId.get());
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 7fa35b89273..d20d2ca2f4e 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -26,7 +26,8 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.api.impl.RuleAlteredJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
import java.util.Collections;
@@ -54,7 +55,7 @@ public final class JobConfigurationBuilder {
result.setSource(createYamlPipelineDataSourceConfiguration(
new
ShardingSpherePipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"))));
result.setTarget(createYamlPipelineDataSourceConfiguration(new
StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
- new RuleAlteredJobAPIImpl().extendJobConfiguration(result);
+
PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
return new RuleAlteredJobConfigurationSwapper().swapToObject(result);
}