This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e989837997a Use PipelineJobRunnerManager instead of
AbstractPipelineJob (#29345)
e989837997a is described below
commit e989837997a582e3b8e8d3845d1e8e932a52669f
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 10 13:44:41 2023 +0800
Use PipelineJobRunnerManager instead of AbstractPipelineJob (#29345)
---
.../core/job/AbstractInseparablePipelineJob.java | 13 ++++--
.../core/job/AbstractSeparablePipelineJob.java | 13 ++++--
.../data/pipeline/core/job/PipelineJob.java | 26 +++--------
.../pipeline/core/job/PipelineJobRegistry.java | 6 +--
.../PipelineJobRunnerCleaner.java} | 31 +++----------
.../PipelineJobRunnerManager.java} | 52 ++++++++++++++++------
.../AbstractJobConfigurationChangedProcessor.java | 14 +++---
.../pipeline/core/job/PipelineJobRegistryTest.java | 9 ++--
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 13 ++----
.../data/pipeline/cdc/api/CDCJobAPI.java | 2 +-
.../pipeline/cdc/engine/CDCJobRunnerCleaner.java} | 41 +++++++----------
.../consistencycheck/ConsistencyCheckJob.java | 9 ++--
...tencyCheckJobConfigurationChangedProcessor.java | 4 +-
.../pipeline/scenario/migration/MigrationJob.java | 8 ++--
.../engine/MigrationJobRunnerCleaner.java} | 37 +++++----------
.../MigrationJobConfigurationChangedProcessor.java | 6 +--
16 files changed, 129 insertions(+), 155 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 31c4622c1d5..09af3e3d7a5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -17,12 +17,15 @@
package org.apache.shardingsphere.data.pipeline.core.job;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
@@ -40,8 +43,12 @@ import java.util.concurrent.CompletableFuture;
*
* @param <T> type of pipeline job item context
*/
+@RequiredArgsConstructor
+@Getter
@Slf4j
-public abstract class AbstractInseparablePipelineJob<T extends
PipelineJobItemContext> extends AbstractPipelineJob {
+public abstract class AbstractInseparablePipelineJob<T extends
PipelineJobItemContext> implements PipelineJob {
+
+ private final PipelineJobRunnerManager jobRunnerManager;
@Override
public final void execute(final ShardingContext shardingContext) {
@@ -51,12 +58,12 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobItemCo
PipelineJobConfiguration jobConfig =
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Collection<T> jobItemContexts = new LinkedList<>();
for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
- if (isStopping()) {
+ if (jobRunnerManager.isStopping()) {
log.info("Stopping true, ignore");
return;
}
T jobItemContext = buildJobItemContext(jobConfig, shardingItem);
- if (!addTasksRunner(shardingItem,
buildTasksRunner(jobItemContext))) {
+ if (!jobRunnerManager.addTasksRunner(shardingItem,
buildTasksRunner(jobItemContext))) {
continue;
}
jobItemContexts.add(jobItemContext);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 46044d11a27..ff1d04515da 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -17,11 +17,14 @@
package org.apache.shardingsphere.data.pipeline.core.job;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -34,15 +37,19 @@ import java.sql.SQLException;
*
* @param <T> type of pipeline job item context
*/
+@RequiredArgsConstructor
+@Getter
@Slf4j
-public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobItemContext> extends AbstractPipelineJob {
+public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobItemContext> implements PipelineJob {
+
+ private final PipelineJobRunnerManager jobRunnerManager;
@Override
public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
log.info("Execute job {}-{}", jobId, shardingItem);
- if (isStopping()) {
+ if (jobRunnerManager.isStopping()) {
log.info("Stopping true, ignore");
return;
}
@@ -60,7 +67,7 @@ public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobItemCont
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner = buildTasksRunner(jobItemContext);
- if (!addTasksRunner(shardingItem, tasksRunner)) {
+ if (!jobRunnerManager.addTasksRunner(shardingItem, tasksRunner)) {
return;
}
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
index e71792d6f9a..a8368a566bf 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
@@ -17,34 +17,18 @@
package org.apache.shardingsphere.data.pipeline.core.job;
-import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import java.util.Collection;
-import java.util.Optional;
-
/**
* Pipeline job.
*/
public interface PipelineJob extends SimpleJob {
/**
- * Get tasks runner.
- *
- * @param shardingItem sharding item
- * @return tasks runner
- */
- Optional<PipelineTasksRunner> getTasksRunner(int shardingItem);
-
- /**
- * Get sharding items.
- *
- * @return sharding items
- */
- Collection<Integer> getShardingItems();
-
- /**
- * Stop job.
+ * Get pipeline job runner manager.
+ *
+ * @return pipeline job runner manager
*/
- void stop();
+ PipelineJobRunnerManager getJobRunnerManager();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java
index 7d812cc1c21..f1660e2714e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistry.java
@@ -76,7 +76,7 @@ public final class PipelineJobRegistry {
if (null == job) {
return;
}
- job.stop();
+ job.getJobRunnerManager().stop();
JOBS.remove(jobId);
}
@@ -88,7 +88,7 @@ public final class PipelineJobRegistry {
* @return pipeline job item context
*/
public static Optional<PipelineJobItemContext> getItemContext(final String
jobId, final int shardingItem) {
- return JOBS.containsKey(jobId) ?
JOBS.get(jobId).getTasksRunner(shardingItem).map(PipelineTasksRunner::getJobItemContext)
: Optional.empty();
+ return JOBS.containsKey(jobId) ?
JOBS.get(jobId).getJobRunnerManager().getTasksRunner(shardingItem).map(PipelineTasksRunner::getJobItemContext)
: Optional.empty();
}
/**
@@ -98,6 +98,6 @@ public final class PipelineJobRegistry {
* @return sharding items
*/
public static Collection<Integer> getShardingItems(final String jobId) {
- return JOBS.containsKey(jobId) ? JOBS.get(jobId).getShardingItems() :
Collections.emptyList();
+ return JOBS.containsKey(jobId) ?
JOBS.get(jobId).getJobRunnerManager().getShardingItems() :
Collections.emptyList();
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerCleaner.java
similarity index 54%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerCleaner.java
index e71792d6f9a..a0a572c2629 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerCleaner.java
@@ -15,36 +15,15 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.job;
-
-import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-
-import java.util.Collection;
-import java.util.Optional;
+package org.apache.shardingsphere.data.pipeline.core.job.engine;
/**
- * Pipeline job.
+ * Pipeline job runner cleaner.
*/
-public interface PipelineJob extends SimpleJob {
-
- /**
- * Get tasks runner.
- *
- * @param shardingItem sharding item
- * @return tasks runner
- */
- Optional<PipelineTasksRunner> getTasksRunner(int shardingItem);
-
- /**
- * Get sharding items.
- *
- * @return sharding items
- */
- Collection<Integer> getShardingItems();
+public interface PipelineJobRunnerCleaner {
/**
- * Stop job.
+ * Clean pipeline job.
*/
- void stop();
+ void clean();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
similarity index 82%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
index cc07cae08ae..d3bd6879c90 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.job;
+package org.apache.shardingsphere.data.pipeline.core.job.engine;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
@@ -38,10 +39,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
- * Abstract pipeline job.
+ * Pipeline job runner manager.
*/
+@RequiredArgsConstructor
@Slf4j
-public abstract class AbstractPipelineJob implements PipelineJob {
+public final class PipelineJobRunnerManager {
private static final long JOB_WAITING_TIMEOUT_MILLS = 2000L;
@@ -51,6 +53,12 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
private final Map<Integer, PipelineTasksRunner> tasksRunners = new
ConcurrentHashMap<>();
+ private final PipelineJobRunnerCleaner cleaner;
+
+ public PipelineJobRunnerManager() {
+ this(null);
+ }
+
/**
* Is stopping.
*
@@ -69,17 +77,33 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
this.jobBootstrap.set(jobBootstrap);
}
- @Override
- public final Optional<PipelineTasksRunner> getTasksRunner(final int
shardingItem) {
+ /**
+ * Get tasks runner.
+ *
+ * @param shardingItem sharding item
+ * @return tasks runner
+ */
+ public Optional<PipelineTasksRunner> getTasksRunner(final int
shardingItem) {
return Optional.ofNullable(tasksRunners.get(shardingItem));
}
- @Override
- public final Collection<Integer> getShardingItems() {
+ /**
+ * Get sharding items.
+ *
+ * @return sharding items
+ */
+ public Collection<Integer> getShardingItems() {
return new ArrayList<>(tasksRunners.keySet());
}
- protected final boolean addTasksRunner(final int shardingItem, final
PipelineTasksRunner tasksRunner) {
+ /**
+ * Add tasks runner.
+ *
+ * @param shardingItem sharding item
+ * @param tasksRunner tasks runner
+ * @return add success or not
+ */
+ public boolean addTasksRunner(final int shardingItem, final
PipelineTasksRunner tasksRunner) {
if (null != tasksRunners.putIfAbsent(shardingItem, tasksRunner)) {
log.warn("shardingItem {} tasks runner exists, ignore",
shardingItem);
return false;
@@ -90,8 +114,10 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
return true;
}
- @Override
- public final void stop() {
+ /**
+ * Stop job.
+ */
+ public void stop() {
Optional<String> jobId =
tasksRunners.values().stream().findFirst().map(each ->
each.getJobItemContext().getJobId());
try {
stopping.set(true);
@@ -104,7 +130,9 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
} finally {
jobId.ifPresent(PipelineJobProgressPersistService::remove);
tasksRunners.values().stream().map(each ->
each.getJobItemContext().getJobProcessContext()).forEach(QuietlyCloser::close);
- clean();
+ if (null != cleaner) {
+ cleaner.clean();
+ }
}
}
@@ -128,6 +156,4 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
spentMills += sleepMillis;
}
}
-
- protected abstract void clean();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
index f128bb4d6b3..38c7b9cfad1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
@@ -18,14 +18,14 @@
package
org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.impl;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
-import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
@@ -80,14 +80,14 @@ public abstract class
AbstractJobConfigurationChangedProcessor implements JobCon
protected void executeJob(final JobConfiguration jobConfig) {
String jobId = jobConfig.getJobName();
- AbstractPipelineJob job = buildPipelineJob(jobId);
+ PipelineJob job = buildPipelineJob(jobId);
PipelineJobRegistry.add(jobId, job);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
job, jobConfig);
- job.setJobBootstrap(oneOffJobBootstrap);
+ job.getJobRunnerManager().setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
}
- protected abstract AbstractPipelineJob buildPipelineJob(String jobId);
+ protected abstract PipelineJob buildPipelineJob(String jobId);
protected abstract PipelineJobType getJobType();
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java
index adbca271a20..0ca79713d4e 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobRegistryTest.java
@@ -23,6 +23,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -42,7 +43,7 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class PipelineJobRegistryTest {
- @Mock
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
private PipelineJob job;
@BeforeEach
@@ -75,7 +76,7 @@ class PipelineJobRegistryTest {
@Test
void assertStop() {
PipelineJobRegistry.stop("foo_job");
- verify(job).stop();
+ verify(job.getJobRunnerManager()).stop();
assertFalse(PipelineJobRegistry.isExisting("foo_job"));
}
@@ -84,7 +85,7 @@ class PipelineJobRegistryTest {
PipelineJobItemContext jobItemContext =
mock(PipelineJobItemContext.class);
PipelineTasksRunner tasksRunner = mock(PipelineTasksRunner.class);
when(tasksRunner.getJobItemContext()).thenReturn(jobItemContext);
-
when(job.getTasksRunner(anyInt())).thenReturn(Optional.of(tasksRunner));
+
when(job.getJobRunnerManager().getTasksRunner(anyInt())).thenReturn(Optional.of(tasksRunner));
Optional<PipelineJobItemContext> actual =
PipelineJobRegistry.getItemContext("foo_job", 1);
assertTrue(actual.isPresent());
assertThat(actual.get(), is(jobItemContext));
@@ -97,7 +98,7 @@ class PipelineJobRegistryTest {
@Test
void assertGetExistedShardingItems() {
- when(job.getShardingItems()).thenReturn(Arrays.asList(1, 2, 3));
+
when(job.getJobRunnerManager().getShardingItems()).thenReturn(Arrays.asList(1,
2, 3));
assertThat(PipelineJobRegistry.getShardingItems("foo_job"),
is(Arrays.asList(1, 2, 3)));
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 2e782170459..fdaed744821 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -29,13 +29,13 @@ import
org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
+import org.apache.shardingsphere.data.pipeline.cdc.engine.CDCJobRunnerCleaner;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
@@ -48,6 +48,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
@@ -59,7 +60,6 @@ import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAl
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.util.Collection;
import java.util.Map;
@@ -83,11 +83,12 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemConte
private final PipelineProcessConfigurationPersistService
processConfigPersistService;
- private final PipelineDataSourceManager dataSourceManager;
+ private final DefaultPipelineDataSourceManager dataSourceManager;
private final CDCJobPreparer jobPreparer;
public CDCJob(final PipelineSink sink) {
+ super(new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink)));
this.sink = sink;
jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
jobItemManager = new PipelineJobItemManager<>(new
CDCJobType().getYamlJobItemProgressSwapper());
@@ -157,12 +158,6 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemConte
ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental",
jobItemContexts.iterator().next()));
}
- @Override
- protected void clean() {
- dataSourceManager.close();
- QuietlyCloser.close(sink);
- }
-
@RequiredArgsConstructor
private final class CDCExecuteCallback implements ExecuteCallback {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 4d1409798c1..0398be69743 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -217,7 +217,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
enable(jobId);
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
job, jobConfigPOJO.toJobConfiguration());
- job.setJobBootstrap(oneOffJobBootstrap);
+ job.getJobRunnerManager().setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
similarity index 50%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
copy to
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
index e71792d6f9a..e02c3343438 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/engine/CDCJobRunnerCleaner.java
@@ -15,36 +15,27 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.job;
+package org.apache.shardingsphere.data.pipeline.cdc.engine;
-import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-
-import java.util.Collection;
-import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerCleaner;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
/**
- * Pipeline job.
+ * CDC job runner cleaner.
*/
-public interface PipelineJob extends SimpleJob {
+@RequiredArgsConstructor
+public final class CDCJobRunnerCleaner implements PipelineJobRunnerCleaner {
- /**
- * Get tasks runner.
- *
- * @param shardingItem sharding item
- * @return tasks runner
- */
- Optional<PipelineTasksRunner> getTasksRunner(int shardingItem);
+ private final DefaultPipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
- /**
- * Get sharding items.
- *
- * @return sharding items
- */
- Collection<Integer> getShardingItems();
+ private final PipelineSink sink;
- /**
- * Stop job.
- */
- void stop();
+ @Override
+ public void clean() {
+ dataSourceManager.close();
+ QuietlyCloser.close(sink);
+ }
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 5eedb8770e4..7cf1a25af59 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -35,6 +36,10 @@ import java.util.Optional;
*/
public final class ConsistencyCheckJob extends
AbstractSeparablePipelineJob<ConsistencyCheckJobItemContext> {
+ public ConsistencyCheckJob() {
+ super(new PipelineJobRunnerManager());
+ }
+
@Override
public ConsistencyCheckJobItemContext buildJobItemContext(final
ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
@@ -51,8 +56,4 @@ public final class ConsistencyCheckJob extends
AbstractSeparablePipelineJob<Cons
@Override
protected void doPrepare(final ConsistencyCheckJobItemContext
jobItemContext) {
}
-
- @Override
- protected void clean() {
- }
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
index 954e63fe640..e57d0066a76 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
@@ -18,9 +18,9 @@
package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.impl.AbstractJobConfigurationChangedProcessor;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
@@ -36,7 +36,7 @@ public final class
ConsistencyCheckJobConfigurationChangedProcessor extends Abst
}
@Override
- protected AbstractPipelineJob buildPipelineJob(final String jobId) {
+ protected PipelineJob buildPipelineJob(final String jobId) {
return new ConsistencyCheckJob();
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 56a4463de5a..0e6d74270fb 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfigurati
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
@@ -45,6 +46,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.engine.MigrationJobRunnerCleaner;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -75,6 +77,7 @@ public final class MigrationJob extends
AbstractSeparablePipelineJob<MigrationJo
private final MigrationJobPreparer jobPreparer;
public MigrationJob() {
+ super(new PipelineJobRunnerManager(new MigrationJobRunnerCleaner()));
jobItemManager = new PipelineJobItemManager<>(new
MigrationJobType().getYamlJobItemProgressSwapper());
processConfigPersistService = new
PipelineProcessConfigurationPersistService();
dataSourceManager = new DefaultPipelineDataSourceManager();
@@ -135,9 +138,4 @@ public final class MigrationJob extends
AbstractSeparablePipelineJob<MigrationJo
protected void doPrepare(final MigrationJobItemContext jobItemContext)
throws SQLException {
jobPreparer.prepare(jobItemContext);
}
-
- @Override
- public void clean() {
- dataSourceManager.close();
- }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/engine/MigrationJobRunnerCleaner.java
similarity index 54%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
copy to
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/engine/MigrationJobRunnerCleaner.java
index e71792d6f9a..48400463c6f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/engine/MigrationJobRunnerCleaner.java
@@ -15,36 +15,21 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.job;
+package org.apache.shardingsphere.data.pipeline.scenario.migration.engine;
-import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-
-import java.util.Collection;
-import java.util.Optional;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerCleaner;
/**
- * Pipeline job.
+ * Migration job runner cleaner.
*/
-public interface PipelineJob extends SimpleJob {
-
- /**
- * Get tasks runner.
- *
- * @param shardingItem sharding item
- * @return tasks runner
- */
- Optional<PipelineTasksRunner> getTasksRunner(int shardingItem);
+public final class MigrationJobRunnerCleaner implements
PipelineJobRunnerCleaner {
- /**
- * Get sharding items.
- *
- * @return sharding items
- */
- Collection<Integer> getShardingItems();
+ private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
- /**
- * Stop job.
- */
- void stop();
+ @Override
+ public void clean() {
+ dataSourceManager.close();
+ }
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index eb7266b676e..f9e57077ac5 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -18,13 +18,13 @@
package
org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.impl.AbstractJobConfigurationChangedProcessor;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
/**
@@ -39,7 +39,7 @@ public final class MigrationJobConfigurationChangedProcessor
extends AbstractJob
}
@Override
- protected AbstractPipelineJob buildPipelineJob(final String jobId) {
+ protected PipelineJob buildPipelineJob(final String jobId) {
return new MigrationJob();
}