This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new adc9aab Refactor scaling job progress deletion in reset and progress
check before starting job (#15465)
adc9aab is described below
commit adc9aab43fc06e4b35aa3122b9d4f03505a6a9cd
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Feb 18 11:49:01 2022 +0800
Refactor scaling job progress deletion in reset and progress check before
starting job (#15465)
* progress might be null
* Remove deleteJobProgress method
---
.../data/pipeline/core/api/GovernanceRepositoryAPI.java | 7 -------
.../data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java | 6 ------
.../data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java | 1 -
.../data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java | 2 +-
4 files changed, 1 insertion(+), 15 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index 0866ffc..63c4a1a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -63,13 +63,6 @@ public interface GovernanceRepositoryAPI {
Optional<Boolean> getJobCheckResult(String jobId);
/**
- * Delete job progress.
- *
- * @param jobId job id
- */
- void deleteJobProgress(String jobId);
-
- /**
* Delete job.
*
* @param jobId job id
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 338567a..efee2d1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -105,12 +105,6 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
}
@Override
- public void deleteJobProgress(final String jobId) {
- log.info("delete job progress {}", jobId);
- repository.delete(String.format("%s/%s/offset",
DataPipelineConstants.DATA_PIPELINE_ROOT, jobId));
- }
-
- @Override
public void deleteJob(final String jobId) {
log.info("delete job {}", jobId);
repository.delete(String.format("%s/%s",
DataPipelineConstants.DATA_PIPELINE_ROOT, jobId));
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index f9e35d8..d78785f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -336,7 +336,6 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
public void reset(final String jobId) {
checkModeConfig();
log.info("Scaling job {} reset target table", jobId);
-
PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJobProgress(jobId);
try {
new
ScalingEnvironmentManager().cleanupTargetTables(getJobConfig(jobId));
} catch (final SQLException ex) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index f30691b..b13fd21 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -297,7 +297,7 @@ public final class RuleAlteredJobWorker {
boolean isUncompletedJobOfSameSchema = false;
for (JobInfo each :
PipelineJobAPIFactory.getRuleAlteredJobAPI().list()) {
if
(PipelineJobAPIFactory.getRuleAlteredJobAPI().getProgress(each.getJobId()).values().stream()
- .allMatch(progress ->
progress.getStatus().equals(JobStatus.FINISHED))) {
+ .allMatch(progress -> null != progress &&
progress.getStatus().equals(JobStatus.FINISHED))) {
continue;
}
JobConfiguration jobConfiguration =
YamlEngine.unmarshal(each.getJobParameter(), JobConfiguration.class, true);