This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bcc4a28d4aa Rename PipelineJobRegistry (#29332)
bcc4a28d4aa is described below

commit bcc4a28d4aab41a53f10ad4aaa340465b1a6ff84
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Dec 8 23:55:31 2023 +0800

    Rename PipelineJobRegistry (#29332)
---
 ...lineJobCenter.java => PipelineJobRegistry.java} |   4 +-
 .../persist/PipelineJobProgressPersistService.java |   4 +-
 .../AbstractJobConfigurationChangedProcessor.java  |  10 +-
 .../pipeline/core/job/PipelineJobCenterTest.java   |  80 ---------------
 .../pipeline/core/job/PipelineJobRegistryTest.java | 108 +++++++++++++++++++++
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   |   6 +-
 .../data/pipeline/cdc/api/CDCJobAPI.java           |   4 +-
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |   4 +-
 .../pipeline/cdc/handler/CDCBackendHandler.java    |  10 +-
 .../migration/preparer/MigrationJobPreparer.java   |   8 +-
 10 files changed, 133 insertions(+), 105 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java
similarity index 97%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java
index 2fdffdaaa06..7d812cc1c21 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java
@@ -29,10 +29,10 @@ import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Pipeline job center.
+ * Pipeline job registry.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class PipelineJobCenter {
+public final class PipelineJobRegistry {
     
     private static final Map<String, PipelineJob> JOBS = new 
ConcurrentHashMap<>();
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 64268bc27d9..2e78a141101 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -22,7 +22,7 @@ import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -121,7 +121,7 @@ public final class PipelineJobProgressPersistService {
                     && !persistContext.getHasNewEvents().get()) {
                 return;
             }
-            Optional<PipelineJobItemContext> jobItemContext = 
PipelineJobCenter.getItemContext(jobId, shardingItem);
+            Optional<PipelineJobItemContext> jobItemContext = 
PipelineJobRegistry.getItemContext(jobId, shardingItem);
             if (!jobItemContext.isPresent()) {
                 return;
             }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
index d7d5882c6cf..f128bb4d6b3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
@@ -23,7 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDa
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor;
 import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
@@ -47,8 +47,8 @@ public abstract class 
AbstractJobConfigurationChangedProcessor implements JobCon
         }
         String jobId = jobConfig.getJobName();
         if (disabled || deleted) {
-            Collection<Integer> jobItems = 
PipelineJobCenter.getShardingItems(jobId);
-            PipelineJobCenter.stop(jobId);
+            Collection<Integer> jobItems = 
PipelineJobRegistry.getShardingItems(jobId);
+            PipelineJobRegistry.stop(jobId);
             if (disabled) {
                 onDisabled(jobConfig, jobItems);
             }
@@ -57,7 +57,7 @@ public abstract class 
AbstractJobConfigurationChangedProcessor implements JobCon
         switch (eventType) {
             case ADDED:
             case UPDATED:
-                if (PipelineJobCenter.isExisting(jobId)) {
+                if (PipelineJobRegistry.isExisting(jobId)) {
                     log.info("{} added to executing jobs failed since it 
already exists", jobId);
                 } else {
                     executeJob(jobConfig);
@@ -81,7 +81,7 @@ public abstract class 
AbstractJobConfigurationChangedProcessor implements JobCon
     protected void executeJob(final JobConfiguration jobConfig) {
         String jobId = jobConfig.getJobName();
         AbstractPipelineJob job = buildPipelineJob(jobId);
-        PipelineJobCenter.add(jobId, job);
+        PipelineJobRegistry.add(jobId, job);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
 job, jobConfig);
         job.setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenterTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenterTest.java
deleted file mode 100644
index 01957a103af..00000000000
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenterTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job;
-
-import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Optional;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-class PipelineJobCenterTest {
-    
-    @Test
-    void assertPipelineJobCenter() {
-        PipelineJob pipelineJob = mock(PipelineJob.class);
-        PipelineJobCenter.add("Job1", pipelineJob);
-        assertTrue(PipelineJobCenter.isExisting("Job1"));
-        assertFalse(PipelineJobCenter.isExisting("Job2"));
-        assertNotNull(PipelineJobCenter.get("Job1"));
-        assertEquals(pipelineJob, PipelineJobCenter.get("Job1"));
-        assertNull(PipelineJobCenter.get("Job2"));
-        PipelineJobCenter.stop("Job1");
-    }
-    
-    @Test
-    void assertGetJobItemContext() {
-        PipelineJob pipelineJob = mock(PipelineJob.class);
-        PipelineTasksRunner pipelineTasksRunner = 
mock(PipelineTasksRunner.class);
-        PipelineJobItemContext pipelineJobItemContext = 
mock(PipelineJobItemContext.class);
-        
when(pipelineJob.getTasksRunner(anyInt())).thenReturn(Optional.of(pipelineTasksRunner));
-        
when(pipelineTasksRunner.getJobItemContext()).thenReturn(pipelineJobItemContext);
-        PipelineJobCenter.add("Job1", pipelineJob);
-        Optional<PipelineJobItemContext> result = 
PipelineJobCenter.getItemContext("Job1", 1);
-        Optional<PipelineJobItemContext> optionalPipelineJobItemContext = 
Optional.ofNullable(pipelineJobItemContext);
-        assertTrue(result.isPresent());
-        assertEquals(Optional.empty(), 
PipelineJobCenter.getItemContext("Job2", 1));
-        assertEquals(optionalPipelineJobItemContext, result);
-        PipelineJobCenter.stop("Job1");
-    }
-    
-    @Test
-    void assertGetShardingItems() {
-        PipelineJob pipelineJob = mock(PipelineJob.class);
-        PipelineJobCenter.add("Job1", pipelineJob);
-        when(pipelineJob.getShardingItems()).thenReturn(Arrays.asList(1, 2, 
3));
-        Collection<Integer> shardingItems = pipelineJob.getShardingItems();
-        Assertions.assertFalse(shardingItems.isEmpty());
-        Assertions.assertEquals(Arrays.asList(1, 2, 3), 
PipelineJobCenter.getShardingItems("Job1"));
-        assertEquals(Collections.EMPTY_LIST, 
PipelineJobCenter.getShardingItems("Job2"));
-        PipelineJobCenter.stop("Job1");
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java
new file mode 100644
index 00000000000..adbca271a20
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job;
+
+import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class PipelineJobRegistryTest {
+    
+    @Mock
+    private PipelineJob job;
+    
+    @BeforeEach
+    void setUp() {
+        PipelineJobRegistry.add("foo_job", job);
+    }
+    
+    @AfterEach
+    void reset() {
+        PipelineJobRegistry.stop("foo_job");
+    }
+    
+    @Test
+    void assertAdd() {
+        assertFalse(PipelineJobRegistry.isExisting("bar_job"));
+        PipelineJobRegistry.add("bar_job", mock(PipelineJob.class));
+        assertTrue(PipelineJobRegistry.isExisting("bar_job"));
+    }
+    
+    @Test
+    void assertIsExisting() {
+        assertTrue(PipelineJobRegistry.isExisting("foo_job"));
+    }
+    
+    @Test
+    void assertGet() {
+        assertThat(PipelineJobRegistry.get("foo_job"), is(job));
+    }
+    
+    @Test
+    void assertStop() {
+        PipelineJobRegistry.stop("foo_job");
+        verify(job).stop();
+        assertFalse(PipelineJobRegistry.isExisting("foo_job"));
+    }
+    
+    @Test
+    void assertGetExistedItemContext() {
+        PipelineJobItemContext jobItemContext = 
mock(PipelineJobItemContext.class);
+        PipelineTasksRunner tasksRunner = mock(PipelineTasksRunner.class);
+        when(tasksRunner.getJobItemContext()).thenReturn(jobItemContext);
+        
when(job.getTasksRunner(anyInt())).thenReturn(Optional.of(tasksRunner));
+        Optional<PipelineJobItemContext> actual = 
PipelineJobRegistry.getItemContext("foo_job", 1);
+        assertTrue(actual.isPresent());
+        assertThat(actual.get(), is(jobItemContext));
+    }
+    
+    @Test
+    void assertGetNotExistedItemContext() {
+        assertThat(PipelineJobRegistry.getItemContext("bar_job", 1), 
is(Optional.empty()));
+    }
+    
+    @Test
+    void assertGetExistedShardingItems() {
+        when(job.getShardingItems()).thenReturn(Arrays.asList(1, 2, 3));
+        assertThat(PipelineJobRegistry.getShardingItems("foo_job"), 
is(Arrays.asList(1, 2, 3)));
+    }
+    
+    @Test
+    void assertGetNotExistedShardingItems() {
+        assertThat(PipelineJobRegistry.getShardingItems("bar_job"), 
is(Collections.emptyList()));
+    }
+}
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 1d24377347c..f3ebcbadaf1 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -55,7 +55,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Dumper
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
@@ -187,7 +187,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
         log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
-        PipelineJobCenter.stop(jobId);
+        PipelineJobRegistry.stop(jobId);
         jobAPI.disable(jobId);
     }
     
@@ -268,7 +268,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
                 CDCSocketSink cdcSink = (CDCSocketSink) 
jobItemContext.getSink();
                 cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", 
"", throwable.getMessage()));
             }
-            PipelineJobCenter.stop(jobId);
+            PipelineJobRegistry.stop(jobId);
             jobAPI.disable(jobId);
         }
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 4f26e82eca2..eec99b71fbc 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -48,7 +48,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
@@ -213,7 +213,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
      */
     public void start(final String jobId, final PipelineSink sink) {
         CDCJob job = new CDCJob(jobId, sink);
-        PipelineJobCenter.add(jobId, job);
+        PipelineJobRegistry.add(jobId, job);
         enable(jobId);
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
 job, jobConfigPOJO.toJobConfiguration());
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 98d430ec3fd..59fe991d7ce 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -41,7 +41,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumpe
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
@@ -92,7 +92,7 @@ public final class CDCJobPreparer {
             jobItemManager.persistProgress(jobItemContext);
         }
         if (jobItemContext.isStopping()) {
-            PipelineJobCenter.stop(jobItemContext.getJobId());
+            PipelineJobRegistry.stop(jobItemContext.getJobId());
             return;
         }
         initIncrementalPosition(jobItemContext);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index dc372106477..e9f44259205 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -47,7 +47,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
@@ -134,8 +134,8 @@ public final class CDCBackendHandler {
     public void startStreaming(final String jobId, final CDCConnectionContext 
connectionContext, final Channel channel) {
         CDCJobConfiguration cdcJobConfig = 
jobConfigManager.getJobConfiguration(jobId);
         ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new 
PipelineJobNotFoundException(jobId));
-        if (PipelineJobCenter.isExisting(jobId)) {
-            PipelineJobCenter.stop(jobId);
+        if (PipelineJobRegistry.isExisting(jobId)) {
+            PipelineJobRegistry.stop(jobId);
         }
         ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
         jobAPI.start(jobId, new CDCSocketSink(channel, database, 
cdcJobConfig.getSchemaTableNames()));
@@ -153,13 +153,13 @@ public final class CDCBackendHandler {
             log.warn("job id is null or empty, ignored");
             return;
         }
-        CDCJob job = (CDCJob) PipelineJobCenter.get(jobId);
+        CDCJob job = (CDCJob) PipelineJobRegistry.get(jobId);
         if (null == job) {
             return;
         }
         if (job.getSink().identifierMatched(channelId)) {
             log.info("close CDC job, channel id: {}", channelId);
-            PipelineJobCenter.stop(jobId);
+            PipelineJobRegistry.stop(jobId);
             jobAPI.disable(jobId);
         }
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 39b2a1041dc..6dc4aaf4583 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -43,7 +43,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChann
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
@@ -97,12 +97,12 @@ public final class MigrationJobPreparer {
                 () -> new UnsupportedSQLOperationException("Migration 
inventory dumper only support StandardPipelineDataSourceConfiguration"));
         
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(),
 Collections.singleton(jobItemContext.getSourceDataSource()));
         if (jobItemContext.isStopping()) {
-            PipelineJobCenter.stop(jobItemContext.getJobId());
+            PipelineJobRegistry.stop(jobItemContext.getJobId());
             return;
         }
         prepareAndCheckTargetWithLock(jobItemContext);
         if (jobItemContext.isStopping()) {
-            PipelineJobCenter.stop(jobItemContext.getJobId());
+            PipelineJobRegistry.stop(jobItemContext.getJobId());
             return;
         }
         boolean isIncrementalSupported = 
PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType());
@@ -113,7 +113,7 @@ public final class MigrationJobPreparer {
         if (isIncrementalSupported) {
             initIncrementalTasks(jobItemContext);
             if (jobItemContext.isStopping()) {
-                PipelineJobCenter.stop(jobItemContext.getJobId());
+                PipelineJobRegistry.stop(jobItemContext.getJobId());
                 return;
             }
         }

Reply via email to