This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new dfbdd3826fd Refactor RuleAlteredJobCenter for common usage (#19934)
dfbdd3826fd is described below

commit dfbdd3826fdc0528c22c040bb7a58c46b9230cb9
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Aug 7 10:19:16 2022 +0800

    Refactor RuleAlteredJobCenter for common usage (#19934)
---
 .../pipeline/api/context/PipelineJobContext.java   | 62 ++++++++++++++++++++++
 .../data/pipeline/api/job/PipelineJob.java         | 17 ++++++
 .../PipelineTasksRunner.java}                      | 25 +++++++--
 .../pipeline/core/api/GovernanceRepositoryAPI.java |  4 +-
 .../core/api/impl/GovernanceRepositoryAPIImpl.java |  4 +-
 .../core/api/impl/RuleAlteredJobAPIImpl.java       |  6 +--
 .../pipeline/core/execute/PipelineJobExecutor.java |  8 +--
 .../job/PipelineJobCenter.java}                    | 50 +++++++----------
 .../persist/PipelineJobProgressPersistService.java | 14 ++---
 .../scenario/rulealtered/RuleAlteredJob.java       | 17 ++++--
 .../rulealtered/RuleAlteredJobContext.java         |  4 +-
 .../rulealtered/RuleAlteredJobScheduler.java       |  9 ++--
 12 files changed, 158 insertions(+), 62 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
new file mode 100644
index 00000000000..2455161d0cf
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.context;
+
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+
+/**
+ * Pipeline job context.
+ */
+public interface PipelineJobContext {
+    
+    /**
+     * Get job id.
+     *
+     * @return job id
+     */
+    String getJobId();
+    
+    /**
+     * Get sharding item.
+     *
+     * @return sharding item
+     */
+    int getShardingItem();
+    
+    /**
+     * Get job status.
+     *
+     * @return job status
+     */
+    JobStatus getStatus();
+    
+    /**
+     * Set job status.
+     *
+     * @param status job status
+     */
+    void setStatus(JobStatus status);
+    
+    /**
+     * Get job configuration.
+     *
+     * @return job configuration
+     */
+    PipelineJobConfiguration getJobConfig();
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
index 183c32856ea..67df4580546 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
@@ -17,8 +17,25 @@
 
 package org.apache.shardingsphere.data.pipeline.api.job;
 
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+
+import java.util.Optional;
+
 /**
  * Pipeline job.
  */
 public interface PipelineJob {
+    
+    /**
+     * Get tasks runner.
+     *
+     * @param shardingItem sharding item
+     * @return tasks runner
+     */
+    Optional<PipelineTasksRunner> getTasksRunner(int shardingItem);
+    
+    /**
+     * Stop job.
+     */
+    void stop();
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/PipelineTasksRunner.java
similarity index 64%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/PipelineTasksRunner.java
index 183c32856ea..d9828fe0b10 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/PipelineTasksRunner.java
@@ -15,10 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.api.task;
+
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
 
 /**
- * Pipeline job.
+ * Pipeline tasks runner.
  */
-public interface PipelineJob {
+public interface PipelineTasksRunner {
+    
+    /**
+     * Get job context.
+     *
+     * @return job context.
+     */
+    PipelineJobContext getJobContext();
+    
+    /**
+     * Start tasks runner.
+     */
+    void start();
+    
+    /**
+     * Stop tasks runner.
+     */
+    void stop();
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index 18eda82e821..fd60c290b46 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.api;
 
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
 import java.util.List;
@@ -43,7 +43,7 @@ public interface GovernanceRepositoryAPI {
      *
      * @param jobContext job context
      */
-    void persistJobProgress(RuleAlteredJobContext jobContext);
+    void persistJobProgress(PipelineJobContext jobContext);
     
     /**
      * Get job progress.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 4f6f3461d18..fd2ff4068d4 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrement
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
@@ -56,7 +57,8 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     }
     
     @Override
-    public void persistJobProgress(final RuleAlteredJobContext jobContext) {
+    public void persistJobProgress(final PipelineJobContext context) {
+        RuleAlteredJobContext jobContext = (RuleAlteredJobContext) context;
         JobProgress jobProgress = new JobProgress();
         jobProgress.setStatus(jobContext.getStatus());
         
jobProgress.setSourceDatabaseType(jobContext.getJobConfig().getSourceDatabaseType());
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 39530c84a01..f5609dec966 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -37,7 +37,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFail
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
@@ -321,11 +321,11 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         ScalingTaskFinishedEvent taskFinishedEvent = new 
ScalingTaskFinishedEvent(jobConfig.getDatabaseName(), 
jobConfig.getActiveVersion(), jobConfig.getNewVersion());
         
PipelineContext.getContextManager().getInstanceContext().getEventBusContext().post(taskFinishedEvent);
         // TODO rewrite job status update after job progress structure refactor
-        RuleAlteredJobCenter.updateJobStatus(jobId, JobStatus.FINISHED);
         for (int each : repositoryAPI.getShardingItems(jobId)) {
+            PipelineJobCenter.getJobContext(jobId, each).ifPresent(jobContext 
-> jobContext.setStatus(JobStatus.FINISHED));
             repositoryAPI.updateShardingJobStatus(jobId, each, 
JobStatus.FINISHED);
         }
-        RuleAlteredJobCenter.stop(jobId);
+        PipelineJobCenter.stop(jobId);
         stop(jobId);
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 7b3d8703dac..4933d16aef1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExe
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -81,13 +81,13 @@ public final class PipelineJobExecutor extends 
AbstractLifecycleExecutor {
                 log.info("isJobSuccessful=true");
                 new RuleAlteredJobPreparer().cleanup(jobConfig);
             }
-            RuleAlteredJobCenter.stop(jobId);
+            PipelineJobCenter.stop(jobId);
             return;
         }
         switch (event.getType()) {
             case ADDED:
             case UPDATED:
-                if 
(RuleAlteredJobCenter.isJobExisting(jobConfigPOJO.getJobName())) {
+                if 
(PipelineJobCenter.isJobExisting(jobConfigPOJO.getJobName())) {
                     log.info("{} added to executing jobs failed since it 
already exists", jobConfigPOJO.getJobName());
                 } else {
                     log.info("{} executing jobs", jobConfigPOJO.getJobName());
@@ -101,7 +101,7 @@ public final class PipelineJobExecutor extends 
AbstractLifecycleExecutor {
     
     private void execute(final JobConfigurationPOJO jobConfigPOJO) {
         RuleAlteredJob job = new RuleAlteredJob();
-        RuleAlteredJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+        PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, 
jobConfigPOJO.toJobConfiguration());
         oneOffJobBootstrap.execute();
         job.setOneOffJobBootstrap(oneOffJobBootstrap);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
similarity index 53%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
index fc9b4163707..54b2a195573 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
@@ -15,22 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 
-import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Rule altered job center.
+ * Pipeline job center.
  */
 @Slf4j
-public final class RuleAlteredJobCenter {
+public final class PipelineJobCenter {
     
-    private static final Map<String, RuleAlteredJob> JOB_MAP = new 
ConcurrentHashMap<>();
+    private static final Map<String, PipelineJob> JOB_MAP = new 
ConcurrentHashMap<>();
     
     /**
      * Add job.
@@ -38,7 +40,7 @@ public final class RuleAlteredJobCenter {
      * @param jobId job id
      * @param job job
      */
-    public static void addJob(final String jobId, final RuleAlteredJob job) {
+    public static void addJob(final String jobId, final PipelineJob job) {
         log.info("add job, jobId={}", jobId);
         JOB_MAP.put(jobId, job);
     }
@@ -59,7 +61,7 @@ public final class RuleAlteredJobCenter {
      * @param jobId job id
      */
     public static void stop(final String jobId) {
-        RuleAlteredJob job = JOB_MAP.get(jobId);
+        PipelineJob job = JOB_MAP.get(jobId);
         if (null == job) {
             log.info("job is null, ignore, jobId={}", jobId);
             return;
@@ -70,32 +72,18 @@ public final class RuleAlteredJobCenter {
     }
     
     /**
-     * Get job scheduler map.
+     * Get job context.
      *
      * @param jobId job id
-     * @return job scheduler
+     * @param shardingItem sharding item
+     * @return job context
      */
-    public static Map<Integer, RuleAlteredJobScheduler> 
getJobSchedulerMap(final String jobId) {
-        RuleAlteredJob ruleAlteredJob = JOB_MAP.get(jobId);
-        return ruleAlteredJob == null ? Collections.emptyMap() : 
ruleAlteredJob.getJobSchedulerMap();
-    }
-    
-    /**
-     * Update job status for all job sharding.
-     *
-     * @param jobId job id
-     * @param jobStatus job status
-     */
-    public static void updateJobStatus(final String jobId, final JobStatus 
jobStatus) {
-        RuleAlteredJob ruleAlteredJob = JOB_MAP.get(jobId);
-        if (null == ruleAlteredJob) {
-            log.info("job is null, ignore, jobId={}", jobId);
-            return;
-        }
-        Map<Integer, RuleAlteredJobScheduler> schedulerMap = 
ruleAlteredJob.getJobSchedulerMap();
-        log.info("updateJobStatus, shardingItems={}, jobStatus={}", 
schedulerMap.keySet(), jobStatus);
-        for (RuleAlteredJobScheduler each : schedulerMap.values()) {
-            each.getJobContext().setStatus(jobStatus);
+    public static Optional<PipelineJobContext> getJobContext(final String 
jobId, final int shardingItem) {
+        PipelineJob job = JOB_MAP.get(jobId);
+        if (null == job) {
+            return Optional.empty();
         }
+        Optional<PipelineTasksRunner> tasksRunner = 
job.getTasksRunner(shardingItem);
+        return tasksRunner.map(PipelineTasksRunner::getJobContext);
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 8380f08f906..100a112bf93 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -18,15 +18,16 @@
 package org.apache.shardingsphere.data.pipeline.core.job.progress.persist;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobScheduler;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 
 import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -94,10 +95,9 @@ public final class PipelineJobProgressPersistService {
                 && !persistContext.getHasNewEvents().get()) {
             return;
         }
-        Map<Integer, RuleAlteredJobScheduler> schedulerMap = 
RuleAlteredJobCenter.getJobSchedulerMap(jobId);
-        RuleAlteredJobScheduler scheduler = schedulerMap.get(shardingItem);
-        if (null == scheduler) {
-            log.warn("persist, job schedule not exists, jobId={}, 
shardingItem={}", jobId, shardingItem);
+        Optional<PipelineJobContext> jobContext = 
PipelineJobCenter.getJobContext(jobId, shardingItem);
+        if (!jobContext.isPresent()) {
+            log.warn("persist, job context does not exist, jobId={}, 
shardingItem={}", jobId, shardingItem);
             return;
         }
         if (null == beforePersistingProgressMillis) {
@@ -105,7 +105,7 @@ public final class PipelineJobProgressPersistService {
         }
         persistContext.getHasNewEvents().set(false);
         long startTimeMillis = System.currentTimeMillis();
-        REPOSITORY_API.persistJobProgress(scheduler.getJobContext());
+        REPOSITORY_API.persistJobProgress(jobContext.get());
         persistContext.getBeforePersistingProgressMillis().set(null);
         if (6 == ThreadLocalRandom.current().nextInt(100)) {
             log.info("persist, jobId={}, shardingItem={}, cost time: {} ms", 
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index ede339c7606..e0ee494d5bf 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -29,11 +29,13 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -53,7 +55,7 @@ public final class RuleAlteredJob implements SimpleJob, 
PipelineJob {
     private final RuleAlteredJobPreparer jobPreparer = new 
RuleAlteredJobPreparer();
     
     @Getter
-    private final Map<Integer, RuleAlteredJobScheduler> jobSchedulerMap = new 
ConcurrentHashMap<>();
+    private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new 
ConcurrentHashMap<>();
     
     private volatile boolean stopping;
     
@@ -72,7 +74,7 @@ public final class RuleAlteredJob implements SimpleJob, 
PipelineJob {
         JobProgress initProgress = 
governanceRepositoryAPI.getJobProgress(shardingContext.getJobName(), 
shardingContext.getShardingItem());
         RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem(), 
initProgress, dataSourceManager, jobPreparer);
         int shardingItem = jobContext.getShardingItem();
-        if (jobSchedulerMap.containsKey(shardingItem)) {
+        if (tasksRunnerMap.containsKey(shardingItem)) {
             // If the following log is output, it is possible that the 
elasticjob task was not closed correctly
             log.warn("schedulerMap contains shardingItem {}, ignore", 
shardingItem);
             return;
@@ -80,10 +82,15 @@ public final class RuleAlteredJob implements SimpleJob, 
PipelineJob {
         log.info("start RuleAlteredJobScheduler, jobId={}, shardingItem={}", 
jobId, shardingItem);
         RuleAlteredJobScheduler jobScheduler = new 
RuleAlteredJobScheduler(jobContext);
         jobScheduler.start();
-        jobSchedulerMap.put(shardingItem, jobScheduler);
+        tasksRunnerMap.put(shardingItem, jobScheduler);
         PipelineJobProgressPersistService.addJobProgressPersistContext(jobId, 
shardingItem);
     }
     
+    @Override
+    public Optional<PipelineTasksRunner> getTasksRunner(final int 
shardingItem) {
+        return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
+    }
+    
     /**
      * Stop job.
      */
@@ -98,10 +105,10 @@ public final class RuleAlteredJob implements SimpleJob, 
PipelineJob {
             return;
         }
         log.info("stop job scheduler, jobId={}", jobId);
-        for (RuleAlteredJobScheduler each : jobSchedulerMap.values()) {
+        for (PipelineTasksRunner each : tasksRunnerMap.values()) {
             each.stop();
         }
-        jobSchedulerMap.clear();
+        tasksRunnerMap.clear();
         
PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 65f9b35d133..08dd085a56a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -25,6 +25,7 @@ import 
org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
@@ -42,8 +43,7 @@ import java.util.LinkedList;
 @Getter
 @Setter
 @Slf4j
-// TODO extract JobContext
-public final class RuleAlteredJobContext {
+public final class RuleAlteredJobContext implements PipelineJobContext {
     
     private final String jobId;
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index de7f6fcac61..3ef58874873 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -26,8 +26,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 
 /**
  * Rule altered job scheduler.
@@ -35,8 +37,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 @Slf4j
 @RequiredArgsConstructor
 @Getter
-// TODO extract JobScheduler
-public final class RuleAlteredJobScheduler implements Runnable {
+public final class RuleAlteredJobScheduler implements PipelineTasksRunner, 
Runnable {
     
     private final RuleAlteredJobContext jobContext;
     
@@ -74,12 +75,12 @@ public final class RuleAlteredJobScheduler implements 
Runnable {
             jobContext.getJobPreparer().prepare(jobContext);
         } catch (final PipelineIgnoredException ex) {
             log.info("pipeline ignore exception: {}", ex.getMessage());
-            RuleAlteredJobCenter.stop(jobId);
+            PipelineJobCenter.stop(jobId);
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
             log.error("job prepare failed, {}-{}", jobId, 
jobContext.getShardingItem(), ex);
-            RuleAlteredJobCenter.stop(jobId);
+            PipelineJobCenter.stop(jobId);
             jobContext.setStatus(JobStatus.PREPARING_FAILURE);
             governanceRepositoryAPI.persistJobProgress(jobContext);
             throw ex;

Reply via email to