This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 61ee704c19e Refactor PipelineJobCenter (#29331)
61ee704c19e is described below

commit 61ee704c19e27ae95cb720b630ace79c8082c5e4
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Dec 8 23:24:24 2023 +0800

    Refactor PipelineJobCenter (#29331)
    
    * Refactor PipelineJobCenter
    
    * Refactor PipelineJobCenter
    
    * Refactor PipelineJobCenter
    
    * Refactor PipelineJobCenter
    
    * Refactor PipelineJobCenter
---
 .../data/pipeline/core/job/PipelineJobCenter.java  | 64 ++++++++++------------
 .../persist/PipelineJobProgressPersistService.java |  2 +-
 .../AbstractJobConfigurationChangedProcessor.java  |  4 +-
 .../pipeline/core/job/PipelineJobCenterTest.java   | 20 +++----
 .../data/pipeline/cdc/api/CDCJobAPI.java           |  2 +-
 .../pipeline/cdc/handler/CDCBackendHandler.java    |  4 +-
 6 files changed, 44 insertions(+), 52 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
index 9dc82da6881..2fdffdaaa06 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 
@@ -33,79 +32,72 @@ import java.util.concurrent.ConcurrentHashMap;
  * Pipeline job center.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-@Slf4j
 public final class PipelineJobCenter {
     
-    private static final Map<String, PipelineJob> JOB_MAP = new 
ConcurrentHashMap<>();
+    private static final Map<String, PipelineJob> JOBS = new 
ConcurrentHashMap<>();
     
     /**
-     * Add job.
+     * Add pipeline job.
      *
-     * @param jobId job id
-     * @param job job
+     * @param jobId pipeline job id
+     * @param job pipeline job
      */
-    public static void addJob(final String jobId, final PipelineJob job) {
-        JOB_MAP.put(jobId, job);
+    public static void add(final String jobId, final PipelineJob job) {
+        JOBS.put(jobId, job);
     }
     
     /**
-     * Is job existing.
+     * Judge whether pipeline job existing.
      *
-     * @param jobId job id
-     * @return true when job exists, else false
+     * @param jobId pipeline job id
+     * @return pipeline job exists or not
      */
-    public static boolean isJobExisting(final String jobId) {
-        return JOB_MAP.containsKey(jobId);
+    public static boolean isExisting(final String jobId) {
+        return JOBS.containsKey(jobId);
     }
     
     /**
-     * Get job.
+     * Get pipeline job.
      *
-     * @param jobId job id
-     * @return job
+     * @param jobId pipeline job id
+     * @return pipeline job
      */
-    public static PipelineJob getJob(final String jobId) {
-        return JOB_MAP.get(jobId);
+    public static PipelineJob get(final String jobId) {
+        return JOBS.get(jobId);
     }
     
     /**
-     * Stop job.
+     * Stop pipeline job.
      *
-     * @param jobId job id
+     * @param jobId pipeline job id
      */
     public static void stop(final String jobId) {
-        PipelineJob job = JOB_MAP.get(jobId);
+        PipelineJob job = JOBS.get(jobId);
         if (null == job) {
             return;
         }
         job.stop();
-        JOB_MAP.remove(jobId);
+        JOBS.remove(jobId);
     }
     
     /**
-     * Get job item context.
+     * Get pipeline job item context.
      *
-     * @param jobId job id
+     * @param jobId pipeline job id
      * @param shardingItem sharding item
-     * @return job item context
+     * @return pipeline job item context
      */
-    public static Optional<PipelineJobItemContext> getJobItemContext(final 
String jobId, final int shardingItem) {
-        PipelineJob job = JOB_MAP.get(jobId);
-        if (null == job) {
-            return Optional.empty();
-        }
-        Optional<PipelineTasksRunner> tasksRunner = 
job.getTasksRunner(shardingItem);
-        return tasksRunner.map(PipelineTasksRunner::getJobItemContext);
+    public static Optional<PipelineJobItemContext> getItemContext(final String 
jobId, final int shardingItem) {
+        return JOBS.containsKey(jobId) ? 
JOBS.get(jobId).getTasksRunner(shardingItem).map(PipelineTasksRunner::getJobItemContext)
 : Optional.empty();
     }
     
     /**
      * Get sharding items.
      *
-     * @param jobId job id
-     * @return sharding items.
+     * @param jobId pipeline job id
+     * @return sharding items
      */
     public static Collection<Integer> getShardingItems(final String jobId) {
-        PipelineJob pipelineJob = JOB_MAP.get(jobId);
-        return null == pipelineJob ? Collections.emptyList() : 
pipelineJob.getShardingItems();
+        return JOBS.containsKey(jobId) ? JOBS.get(jobId).getShardingItems() : 
Collections.emptyList();
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index a2a1b9db263..64268bc27d9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -121,7 +121,7 @@ public final class PipelineJobProgressPersistService {
                     && !persistContext.getHasNewEvents().get()) {
                 return;
             }
-            Optional<PipelineJobItemContext> jobItemContext = 
PipelineJobCenter.getJobItemContext(jobId, shardingItem);
+            Optional<PipelineJobItemContext> jobItemContext = 
PipelineJobCenter.getItemContext(jobId, shardingItem);
             if (!jobItemContext.isPresent()) {
                 return;
             }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
index ccb92aa53f5..d7d5882c6cf 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
@@ -57,7 +57,7 @@ public abstract class 
AbstractJobConfigurationChangedProcessor implements JobCon
         switch (eventType) {
             case ADDED:
             case UPDATED:
-                if (PipelineJobCenter.isJobExisting(jobId)) {
+                if (PipelineJobCenter.isExisting(jobId)) {
                     log.info("{} added to executing jobs failed since it 
already exists", jobId);
                 } else {
                     executeJob(jobConfig);
@@ -81,7 +81,7 @@ public abstract class 
AbstractJobConfigurationChangedProcessor implements JobCon
     protected void executeJob(final JobConfiguration jobConfig) {
         String jobId = jobConfig.getJobName();
         AbstractPipelineJob job = buildPipelineJob(jobId);
-        PipelineJobCenter.addJob(jobId, job);
+        PipelineJobCenter.add(jobId, job);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
 job, jobConfig);
         job.setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenterTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenterTest.java
index ee2b40be51e..01957a103af 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenterTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenterTest.java
@@ -41,12 +41,12 @@ class PipelineJobCenterTest {
     @Test
     void assertPipelineJobCenter() {
         PipelineJob pipelineJob = mock(PipelineJob.class);
-        PipelineJobCenter.addJob("Job1", pipelineJob);
-        assertTrue(PipelineJobCenter.isJobExisting("Job1"));
-        assertFalse(PipelineJobCenter.isJobExisting("Job2"));
-        assertNotNull(PipelineJobCenter.getJob("Job1"));
-        assertEquals(pipelineJob, PipelineJobCenter.getJob("Job1"));
-        assertNull(PipelineJobCenter.getJob("Job2"));
+        PipelineJobCenter.add("Job1", pipelineJob);
+        assertTrue(PipelineJobCenter.isExisting("Job1"));
+        assertFalse(PipelineJobCenter.isExisting("Job2"));
+        assertNotNull(PipelineJobCenter.get("Job1"));
+        assertEquals(pipelineJob, PipelineJobCenter.get("Job1"));
+        assertNull(PipelineJobCenter.get("Job2"));
         PipelineJobCenter.stop("Job1");
     }
     
@@ -57,11 +57,11 @@ class PipelineJobCenterTest {
         PipelineJobItemContext pipelineJobItemContext = 
mock(PipelineJobItemContext.class);
         
when(pipelineJob.getTasksRunner(anyInt())).thenReturn(Optional.of(pipelineTasksRunner));
         
when(pipelineTasksRunner.getJobItemContext()).thenReturn(pipelineJobItemContext);
-        PipelineJobCenter.addJob("Job1", pipelineJob);
-        Optional<PipelineJobItemContext> result = 
PipelineJobCenter.getJobItemContext("Job1", 1);
+        PipelineJobCenter.add("Job1", pipelineJob);
+        Optional<PipelineJobItemContext> result = 
PipelineJobCenter.getItemContext("Job1", 1);
         Optional<PipelineJobItemContext> optionalPipelineJobItemContext = 
Optional.ofNullable(pipelineJobItemContext);
         assertTrue(result.isPresent());
-        assertEquals(Optional.empty(), 
PipelineJobCenter.getJobItemContext("Job2", 1));
+        assertEquals(Optional.empty(), 
PipelineJobCenter.getItemContext("Job2", 1));
         assertEquals(optionalPipelineJobItemContext, result);
         PipelineJobCenter.stop("Job1");
     }
@@ -69,7 +69,7 @@ class PipelineJobCenterTest {
     @Test
     void assertGetShardingItems() {
         PipelineJob pipelineJob = mock(PipelineJob.class);
-        PipelineJobCenter.addJob("Job1", pipelineJob);
+        PipelineJobCenter.add("Job1", pipelineJob);
         when(pipelineJob.getShardingItems()).thenReturn(Arrays.asList(1, 2, 
3));
         Collection<Integer> shardingItems = pipelineJob.getShardingItems();
         Assertions.assertFalse(shardingItems.isEmpty());
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 1a57c52c09c..4f26e82eca2 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -213,7 +213,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
      */
     public void start(final String jobId, final PipelineSink sink) {
         CDCJob job = new CDCJob(jobId, sink);
-        PipelineJobCenter.addJob(jobId, job);
+        PipelineJobCenter.add(jobId, job);
         enable(jobId);
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
 job, jobConfigPOJO.toJobConfiguration());
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index e518026af1c..dc372106477 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -134,7 +134,7 @@ public final class CDCBackendHandler {
     public void startStreaming(final String jobId, final CDCConnectionContext 
connectionContext, final Channel channel) {
         CDCJobConfiguration cdcJobConfig = 
jobConfigManager.getJobConfiguration(jobId);
         ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new 
PipelineJobNotFoundException(jobId));
-        if (PipelineJobCenter.isJobExisting(jobId)) {
+        if (PipelineJobCenter.isExisting(jobId)) {
             PipelineJobCenter.stop(jobId);
         }
         ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
@@ -153,7 +153,7 @@ public final class CDCBackendHandler {
             log.warn("job id is null or empty, ignored");
             return;
         }
-        CDCJob job = (CDCJob) PipelineJobCenter.getJob(jobId);
+        CDCJob job = (CDCJob) PipelineJobCenter.get(jobId);
         if (null == job) {
             return;
         }

Reply via email to