This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new dfbdd3826fd Refactor RuleAlteredJobCenter for common usage (#19934)
dfbdd3826fd is described below
commit dfbdd3826fdc0528c22c040bb7a58c46b9230cb9
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Aug 7 10:19:16 2022 +0800
Refactor RuleAlteredJobCenter for common usage (#19934)
---
.../pipeline/api/context/PipelineJobContext.java | 62 ++++++++++++++++++++++
.../data/pipeline/api/job/PipelineJob.java | 17 ++++++
.../PipelineTasksRunner.java} | 25 +++++++--
.../pipeline/core/api/GovernanceRepositoryAPI.java | 4 +-
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 4 +-
.../core/api/impl/RuleAlteredJobAPIImpl.java | 6 +--
.../pipeline/core/execute/PipelineJobExecutor.java | 8 +--
.../job/PipelineJobCenter.java} | 50 +++++++----------
.../persist/PipelineJobProgressPersistService.java | 14 ++---
.../scenario/rulealtered/RuleAlteredJob.java | 17 ++++--
.../rulealtered/RuleAlteredJobContext.java | 4 +-
.../rulealtered/RuleAlteredJobScheduler.java | 9 ++--
12 files changed, 158 insertions(+), 62 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
new file mode 100644
index 00000000000..2455161d0cf
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.context;
+
+import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+
+/**
+ * Pipeline job context.
+ */
+public interface PipelineJobContext {
+
+ /**
+ * Get job id.
+ *
+ * @return job id
+ */
+ String getJobId();
+
+ /**
+ * Get sharding item.
+ *
+ * @return sharding item
+ */
+ int getShardingItem();
+
+ /**
+ * Get job status.
+ *
+ * @return job status
+ */
+ JobStatus getStatus();
+
+ /**
+ * Set job status.
+ *
+ * @param status job status
+ */
+ void setStatus(JobStatus status);
+
+ /**
+ * Get job configuration.
+ *
+ * @return job configuration
+ */
+ PipelineJobConfiguration getJobConfig();
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
index 183c32856ea..67df4580546 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
@@ -17,8 +17,25 @@
package org.apache.shardingsphere.data.pipeline.api.job;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+
+import java.util.Optional;
+
/**
* Pipeline job.
*/
public interface PipelineJob {
+
+ /**
+ * Get tasks runner.
+ *
+ * @param shardingItem sharding item
+ * @return tasks runner
+ */
+ Optional<PipelineTasksRunner> getTasksRunner(int shardingItem);
+
+ /**
+ * Stop job.
+ */
+ void stop();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/PipelineTasksRunner.java
similarity index 64%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/PipelineTasksRunner.java
index 183c32856ea..d9828fe0b10 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/PipelineTasksRunner.java
@@ -15,10 +15,29 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.api.task;
+
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
/**
- * Pipeline job.
+ * Pipeline tasks runner.
*/
-public interface PipelineJob {
+public interface PipelineTasksRunner {
+
+ /**
+ * Get job context.
+ *
+ * @return job context.
+ */
+ PipelineJobContext getJobContext();
+
+ /**
+ * Start tasks runner.
+ */
+ void start();
+
+ /**
+ * Stop tasks runner.
+ */
+ void stop();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index 18eda82e821..fd60c290b46 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.api;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import java.util.List;
@@ -43,7 +43,7 @@ public interface GovernanceRepositoryAPI {
*
* @param jobContext job context
*/
- void persistJobProgress(RuleAlteredJobContext jobContext);
+ void persistJobProgress(PipelineJobContext jobContext);
/**
* Get job progress.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 4f6f3461d18..fd2ff4068d4 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrement
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
@@ -56,7 +57,8 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
}
@Override
- public void persistJobProgress(final RuleAlteredJobContext jobContext) {
+ public void persistJobProgress(final PipelineJobContext context) {
+ RuleAlteredJobContext jobContext = (RuleAlteredJobContext) context;
JobProgress jobProgress = new JobProgress();
jobProgress.setStatus(jobContext.getStatus());
jobProgress.setSourceDatabaseType(jobContext.getJobConfig().getSourceDatabaseType());
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 39530c84a01..f5609dec966 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -37,7 +37,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFail
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
@@ -321,11 +321,11 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
ScalingTaskFinishedEvent taskFinishedEvent = new
ScalingTaskFinishedEvent(jobConfig.getDatabaseName(),
jobConfig.getActiveVersion(), jobConfig.getNewVersion());
PipelineContext.getContextManager().getInstanceContext().getEventBusContext().post(taskFinishedEvent);
// TODO rewrite job status update after job progress structure refactor
- RuleAlteredJobCenter.updateJobStatus(jobId, JobStatus.FINISHED);
for (int each : repositoryAPI.getShardingItems(jobId)) {
+ PipelineJobCenter.getJobContext(jobId, each).ifPresent(jobContext
-> jobContext.setStatus(JobStatus.FINISHED));
repositoryAPI.updateShardingJobStatus(jobId, each,
JobStatus.FINISHED);
}
- RuleAlteredJobCenter.stop(jobId);
+ PipelineJobCenter.stop(jobId);
stop(jobId);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 7b3d8703dac..4933d16aef1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExe
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -81,13 +81,13 @@ public final class PipelineJobExecutor extends
AbstractLifecycleExecutor {
log.info("isJobSuccessful=true");
new RuleAlteredJobPreparer().cleanup(jobConfig);
}
- RuleAlteredJobCenter.stop(jobId);
+ PipelineJobCenter.stop(jobId);
return;
}
switch (event.getType()) {
case ADDED:
case UPDATED:
- if
(RuleAlteredJobCenter.isJobExisting(jobConfigPOJO.getJobName())) {
+ if
(PipelineJobCenter.isJobExisting(jobConfigPOJO.getJobName())) {
log.info("{} added to executing jobs failed since it
already exists", jobConfigPOJO.getJobName());
} else {
log.info("{} executing jobs", jobConfigPOJO.getJobName());
@@ -101,7 +101,7 @@ public final class PipelineJobExecutor extends
AbstractLifecycleExecutor {
private void execute(final JobConfigurationPOJO jobConfigPOJO) {
RuleAlteredJob job = new RuleAlteredJob();
- RuleAlteredJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+ PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job,
jobConfigPOJO.toJobConfiguration());
oneOffJobBootstrap.execute();
job.setOneOffJobBootstrap(oneOffJobBootstrap);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
similarity index 53%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
index fc9b4163707..54b2a195573 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
@@ -15,22 +15,24 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
- * Rule altered job center.
+ * Pipeline job center.
*/
@Slf4j
-public final class RuleAlteredJobCenter {
+public final class PipelineJobCenter {
- private static final Map<String, RuleAlteredJob> JOB_MAP = new
ConcurrentHashMap<>();
+ private static final Map<String, PipelineJob> JOB_MAP = new
ConcurrentHashMap<>();
/**
* Add job.
@@ -38,7 +40,7 @@ public final class RuleAlteredJobCenter {
* @param jobId job id
* @param job job
*/
- public static void addJob(final String jobId, final RuleAlteredJob job) {
+ public static void addJob(final String jobId, final PipelineJob job) {
log.info("add job, jobId={}", jobId);
JOB_MAP.put(jobId, job);
}
@@ -59,7 +61,7 @@ public final class RuleAlteredJobCenter {
* @param jobId job id
*/
public static void stop(final String jobId) {
- RuleAlteredJob job = JOB_MAP.get(jobId);
+ PipelineJob job = JOB_MAP.get(jobId);
if (null == job) {
log.info("job is null, ignore, jobId={}", jobId);
return;
@@ -70,32 +72,18 @@ public final class RuleAlteredJobCenter {
}
/**
- * Get job scheduler map.
+ * Get job context.
*
* @param jobId job id
- * @return job scheduler
+ * @param shardingItem sharding item
+ * @return job context
*/
- public static Map<Integer, RuleAlteredJobScheduler>
getJobSchedulerMap(final String jobId) {
- RuleAlteredJob ruleAlteredJob = JOB_MAP.get(jobId);
- return ruleAlteredJob == null ? Collections.emptyMap() :
ruleAlteredJob.getJobSchedulerMap();
- }
-
- /**
- * Update job status for all job sharding.
- *
- * @param jobId job id
- * @param jobStatus job status
- */
- public static void updateJobStatus(final String jobId, final JobStatus
jobStatus) {
- RuleAlteredJob ruleAlteredJob = JOB_MAP.get(jobId);
- if (null == ruleAlteredJob) {
- log.info("job is null, ignore, jobId={}", jobId);
- return;
- }
- Map<Integer, RuleAlteredJobScheduler> schedulerMap =
ruleAlteredJob.getJobSchedulerMap();
- log.info("updateJobStatus, shardingItems={}, jobStatus={}",
schedulerMap.keySet(), jobStatus);
- for (RuleAlteredJobScheduler each : schedulerMap.values()) {
- each.getJobContext().setStatus(jobStatus);
+ public static Optional<PipelineJobContext> getJobContext(final String
jobId, final int shardingItem) {
+ PipelineJob job = JOB_MAP.get(jobId);
+ if (null == job) {
+ return Optional.empty();
}
+ Optional<PipelineTasksRunner> tasksRunner =
job.getTasksRunner(shardingItem);
+ return tasksRunner.map(PipelineTasksRunner::getJobContext);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 8380f08f906..100a112bf93 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -18,15 +18,16 @@
package org.apache.shardingsphere.data.pipeline.core.job.progress.persist;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobScheduler;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -94,10 +95,9 @@ public final class PipelineJobProgressPersistService {
&& !persistContext.getHasNewEvents().get()) {
return;
}
- Map<Integer, RuleAlteredJobScheduler> schedulerMap =
RuleAlteredJobCenter.getJobSchedulerMap(jobId);
- RuleAlteredJobScheduler scheduler = schedulerMap.get(shardingItem);
- if (null == scheduler) {
- log.warn("persist, job schedule not exists, jobId={},
shardingItem={}", jobId, shardingItem);
+ Optional<PipelineJobContext> jobContext =
PipelineJobCenter.getJobContext(jobId, shardingItem);
+ if (!jobContext.isPresent()) {
+ log.warn("persist, job context does not exist, jobId={},
shardingItem={}", jobId, shardingItem);
return;
}
if (null == beforePersistingProgressMillis) {
@@ -105,7 +105,7 @@ public final class PipelineJobProgressPersistService {
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
- REPOSITORY_API.persistJobProgress(scheduler.getJobContext());
+ REPOSITORY_API.persistJobProgress(jobContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
log.info("persist, jobId={}, shardingItem={}, cost time: {} ms",
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index ede339c7606..e0ee494d5bf 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -29,11 +29,13 @@ import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -53,7 +55,7 @@ public final class RuleAlteredJob implements SimpleJob,
PipelineJob {
private final RuleAlteredJobPreparer jobPreparer = new
RuleAlteredJobPreparer();
@Getter
- private final Map<Integer, RuleAlteredJobScheduler> jobSchedulerMap = new
ConcurrentHashMap<>();
+ private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new
ConcurrentHashMap<>();
private volatile boolean stopping;
@@ -72,7 +74,7 @@ public final class RuleAlteredJob implements SimpleJob,
PipelineJob {
JobProgress initProgress =
governanceRepositoryAPI.getJobProgress(shardingContext.getJobName(),
shardingContext.getShardingItem());
RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem(),
initProgress, dataSourceManager, jobPreparer);
int shardingItem = jobContext.getShardingItem();
- if (jobSchedulerMap.containsKey(shardingItem)) {
+ if (tasksRunnerMap.containsKey(shardingItem)) {
// If the following log is output, it is possible that the
elasticjob task was not closed correctly
log.warn("schedulerMap contains shardingItem {}, ignore",
shardingItem);
return;
@@ -80,10 +82,15 @@ public final class RuleAlteredJob implements SimpleJob,
PipelineJob {
log.info("start RuleAlteredJobScheduler, jobId={}, shardingItem={}",
jobId, shardingItem);
RuleAlteredJobScheduler jobScheduler = new
RuleAlteredJobScheduler(jobContext);
jobScheduler.start();
- jobSchedulerMap.put(shardingItem, jobScheduler);
+ tasksRunnerMap.put(shardingItem, jobScheduler);
PipelineJobProgressPersistService.addJobProgressPersistContext(jobId,
shardingItem);
}
+ @Override
+ public Optional<PipelineTasksRunner> getTasksRunner(final int
shardingItem) {
+ return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
+ }
+
/**
* Stop job.
*/
@@ -98,10 +105,10 @@ public final class RuleAlteredJob implements SimpleJob,
PipelineJob {
return;
}
log.info("stop job scheduler, jobId={}", jobId);
- for (RuleAlteredJobScheduler each : jobSchedulerMap.values()) {
+ for (PipelineTasksRunner each : tasksRunnerMap.values()) {
each.stop();
}
- jobSchedulerMap.clear();
+ tasksRunnerMap.clear();
PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 65f9b35d133..08dd085a56a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -25,6 +25,7 @@ import
org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
@@ -42,8 +43,7 @@ import java.util.LinkedList;
@Getter
@Setter
@Slf4j
-// TODO extract JobContext
-public final class RuleAlteredJobContext {
+public final class RuleAlteredJobContext implements PipelineJobContext {
private final String jobId;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index de7f6fcac61..3ef58874873 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -26,8 +26,10 @@ import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
/**
* Rule altered job scheduler.
@@ -35,8 +37,7 @@ import
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@Slf4j
@RequiredArgsConstructor
@Getter
-// TODO extract JobScheduler
-public final class RuleAlteredJobScheduler implements Runnable {
+public final class RuleAlteredJobScheduler implements PipelineTasksRunner,
Runnable {
private final RuleAlteredJobContext jobContext;
@@ -74,12 +75,12 @@ public final class RuleAlteredJobScheduler implements
Runnable {
jobContext.getJobPreparer().prepare(jobContext);
} catch (final PipelineIgnoredException ex) {
log.info("pipeline ignore exception: {}", ex.getMessage());
- RuleAlteredJobCenter.stop(jobId);
+ PipelineJobCenter.stop(jobId);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
log.error("job prepare failed, {}-{}", jobId,
jobContext.getShardingItem(), ex);
- RuleAlteredJobCenter.stop(jobId);
+ PipelineJobCenter.stop(jobId);
jobContext.setStatus(JobStatus.PREPARING_FAILURE);
governanceRepositoryAPI.persistJobProgress(jobContext);
throw ex;