This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 322e8e7f579 Refactor AbstractSimplePipelineJob (#29334)
322e8e7f579 is described below
commit 322e8e7f5796300c980c042c26e65ff29e05a9e2
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 01:41:15 2023 +0800
Refactor AbstractSimplePipelineJob (#29334)
---
.../pipeline/core/job/AbstractPipelineJob.java | 8 +++----
.../core/job/AbstractSimplePipelineJob.java | 19 +++++-----------
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 26 ++++++++++------------
.../pipeline/cdc/handler/CDCBackendHandler.java | 4 +---
4 files changed, 23 insertions(+), 34 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 5f0aa95d680..910e7ceaff2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -99,16 +99,16 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
protected abstract void doPrepare(PipelineJobItemContext jobItemContext)
throws SQLException;
@Override
- public Optional<PipelineTasksRunner> getTasksRunner(final int
shardingItem) {
+ public final Optional<PipelineTasksRunner> getTasksRunner(final int
shardingItem) {
return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
}
@Override
- public Collection<Integer> getShardingItems() {
+ public final Collection<Integer> getShardingItems() {
return new ArrayList<>(tasksRunnerMap.keySet());
}
- protected boolean addTasksRunner(final int shardingItem, final
PipelineTasksRunner tasksRunner) {
+ protected final boolean addTasksRunner(final int shardingItem, final
PipelineTasksRunner tasksRunner) {
if (null != tasksRunnerMap.putIfAbsent(shardingItem, tasksRunner)) {
log.warn("shardingItem {} tasks runner exists, ignore",
shardingItem);
return false;
@@ -120,7 +120,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
}
@Override
- public void stop() {
+ public final void stop() {
try {
innerStop();
} finally {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index ed04cabdf59..96861144eb8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -37,16 +37,6 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
super(jobId);
}
- /**
- * Build pipeline job item context.
- *
- * @param shardingContext sharding context
- * @return pipeline job item context
- */
- protected abstract PipelineJobItemContext
buildPipelineJobItemContext(ShardingContext shardingContext);
-
- protected abstract PipelineTasksRunner
buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
-
@Override
public void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
@@ -57,8 +47,7 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
return;
}
try {
- PipelineJobItemContext jobItemContext =
buildPipelineJobItemContext(shardingContext);
- execute0(jobItemContext);
+ execute(buildPipelineJobItemContext(shardingContext));
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -67,7 +56,7 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
}
}
- private void execute0(final PipelineJobItemContext jobItemContext) {
+ private void execute(final PipelineJobItemContext jobItemContext) {
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner =
buildPipelineTasksRunner(jobItemContext);
@@ -80,6 +69,10 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
tasksRunner.start();
}
+ protected abstract PipelineJobItemContext
buildPipelineJobItemContext(ShardingContext shardingContext);
+
+ protected abstract PipelineTasksRunner
buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
+
private void processFailed(final PipelineJobManager jobManager, final
String jobId, final int shardingItem, final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index f3ebcbadaf1..b8999fb58bb 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -31,9 +31,6 @@ import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketS
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
-import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
@@ -43,26 +40,28 @@ import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
-import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
-import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
+import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
+import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
-import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -115,8 +114,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
return;
}
CDCJobItemContext jobItemContext =
buildCDCJobItemContext(jobConfig, shardingItem);
- PipelineTasksRunner tasksRunner = new
CDCTasksRunner(jobItemContext);
- if (!addTasksRunner(shardingItem, tasksRunner)) {
+ if (!addTasksRunner(shardingItem, new
CDCTasksRunner(jobItemContext))) {
continue;
}
jobItemContexts.add(jobItemContext);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index e9f44259205..cfad071beaf 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -134,9 +134,7 @@ public final class CDCBackendHandler {
public void startStreaming(final String jobId, final CDCConnectionContext
connectionContext, final Channel channel) {
CDCJobConfiguration cdcJobConfig =
jobConfigManager.getJobConfiguration(jobId);
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new
PipelineJobNotFoundException(jobId));
- if (PipelineJobRegistry.isExisting(jobId)) {
- PipelineJobRegistry.stop(jobId);
- }
+ PipelineJobRegistry.stop(jobId);
ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
jobAPI.start(jobId, new CDCSocketSink(channel, database,
cdcJobConfig.getSchemaTableNames()));
connectionContext.setJobId(jobId);