This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 322e8e7f579 Refactor AbstractSimplePipelineJob (#29334)
322e8e7f579 is described below

commit 322e8e7f5796300c980c042c26e65ff29e05a9e2
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 01:41:15 2023 +0800

    Refactor AbstractSimplePipelineJob (#29334)
---
 .../pipeline/core/job/AbstractPipelineJob.java     |  8 +++----
 .../core/job/AbstractSimplePipelineJob.java        | 19 +++++-----------
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   | 26 ++++++++++------------
 .../pipeline/cdc/handler/CDCBackendHandler.java    |  4 +---
 4 files changed, 23 insertions(+), 34 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 5f0aa95d680..910e7ceaff2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -99,16 +99,16 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     protected abstract void doPrepare(PipelineJobItemContext jobItemContext) 
throws SQLException;
     
     @Override
-    public Optional<PipelineTasksRunner> getTasksRunner(final int 
shardingItem) {
+    public final Optional<PipelineTasksRunner> getTasksRunner(final int 
shardingItem) {
         return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
     }
     
     @Override
-    public Collection<Integer> getShardingItems() {
+    public final Collection<Integer> getShardingItems() {
         return new ArrayList<>(tasksRunnerMap.keySet());
     }
     
-    protected boolean addTasksRunner(final int shardingItem, final 
PipelineTasksRunner tasksRunner) {
+    protected final boolean addTasksRunner(final int shardingItem, final 
PipelineTasksRunner tasksRunner) {
         if (null != tasksRunnerMap.putIfAbsent(shardingItem, tasksRunner)) {
             log.warn("shardingItem {} tasks runner exists, ignore", 
shardingItem);
             return false;
@@ -120,7 +120,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     }
     
     @Override
-    public void stop() {
+    public final void stop() {
         try {
             innerStop();
         } finally {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index ed04cabdf59..96861144eb8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -37,16 +37,6 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
         super(jobId);
     }
     
-    /**
-     * Build pipeline job item context.
-     * 
-     * @param shardingContext sharding context
-     * @return pipeline job item context
-     */
-    protected abstract PipelineJobItemContext 
buildPipelineJobItemContext(ShardingContext shardingContext);
-    
-    protected abstract PipelineTasksRunner 
buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
-    
     @Override
     public void execute(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
@@ -57,8 +47,7 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
             return;
         }
         try {
-            PipelineJobItemContext jobItemContext = 
buildPipelineJobItemContext(shardingContext);
-            execute0(jobItemContext);
+            execute(buildPipelineJobItemContext(shardingContext));
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
@@ -67,7 +56,7 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
         }
     }
     
-    private void execute0(final PipelineJobItemContext jobItemContext) {
+    private void execute(final PipelineJobItemContext jobItemContext) {
         String jobId = jobItemContext.getJobId();
         int shardingItem = jobItemContext.getShardingItem();
         PipelineTasksRunner tasksRunner = 
buildPipelineTasksRunner(jobItemContext);
@@ -80,6 +69,10 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
         tasksRunner.start();
     }
     
+    protected abstract PipelineJobItemContext 
buildPipelineJobItemContext(ShardingContext shardingContext);
+    
+    protected abstract PipelineTasksRunner 
buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
+    
     private void processFailed(final PipelineJobManager jobManager, final 
String jobId, final int shardingItem, final Exception ex) {
         log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index f3ebcbadaf1..b8999fb58bb 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -31,9 +31,6 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketS
 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
@@ -43,26 +40,28 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
-import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
+import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
-import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -115,8 +114,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
                 return;
             }
             CDCJobItemContext jobItemContext = 
buildCDCJobItemContext(jobConfig, shardingItem);
-            PipelineTasksRunner tasksRunner = new 
CDCTasksRunner(jobItemContext);
-            if (!addTasksRunner(shardingItem, tasksRunner)) {
+            if (!addTasksRunner(shardingItem, new 
CDCTasksRunner(jobItemContext))) {
                 continue;
             }
             jobItemContexts.add(jobItemContext);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index e9f44259205..cfad071beaf 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -134,9 +134,7 @@ public final class CDCBackendHandler {
     public void startStreaming(final String jobId, final CDCConnectionContext 
connectionContext, final Channel channel) {
         CDCJobConfiguration cdcJobConfig = 
jobConfigManager.getJobConfiguration(jobId);
         ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new 
PipelineJobNotFoundException(jobId));
-        if (PipelineJobRegistry.isExisting(jobId)) {
-            PipelineJobRegistry.stop(jobId);
-        }
+        PipelineJobRegistry.stop(jobId);
         ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
         jobAPI.start(jobId, new CDCSocketSink(channel, database, 
cdcJobConfig.getSchemaTableNames()));
         connectionContext.setJobId(jobId);

Reply via email to