This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 260635d8fc4 Remove AbstractPipelineJob.jobType and jobId (#29344)
260635d8fc4 is described below
commit 260635d8fc488eaa85609de03f9a03a24da6ac2d
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 23:17:11 2023 +0800
Remove AbstractPipelineJob.jobType and jobId (#29344)
---
.../core/job/AbstractInseparablePipelineJob.java | 5 -----
.../data/pipeline/core/job/AbstractPipelineJob.java | 20 +++-----------------
.../core/job/AbstractSeparablePipelineJob.java | 4 ----
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 3 +--
.../data/pipeline/cdc/api/CDCJobAPI.java | 2 +-
.../consistencycheck/ConsistencyCheckJob.java | 4 ----
...istencyCheckJobConfigurationChangedProcessor.java | 2 +-
.../pipeline/scenario/migration/MigrationJob.java | 3 +--
.../MigrationJobConfigurationChangedProcessor.java | 2 +-
.../consistencycheck/ConsistencyCheckJobTest.java | 2 +-
10 files changed, 9 insertions(+), 38 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 1566479ee33..31c4622c1d5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -43,10 +43,6 @@ import java.util.concurrent.CompletableFuture;
@Slf4j
public abstract class AbstractInseparablePipelineJob<T extends
PipelineJobItemContext> extends AbstractPipelineJob {
- protected AbstractInseparablePipelineJob(final String jobId) {
- super(jobId);
- }
-
@Override
public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
@@ -130,7 +126,6 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobItemCo
}
private void executeIncrementalTasks(final PipelineJobType jobType, final
Collection<T> jobItemContexts) {
- log.info("Execute incremental tasks, jobId={}", getJobId());
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (T each : jobItemContexts) {
if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 75b09d70ad4..cc07cae08ae 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -17,12 +17,9 @@
package org.apache.shardingsphere.data.pipeline.core.job;
-import lombok.AccessLevel;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -30,7 +27,6 @@ import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarr
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.util.ArrayList;
@@ -49,23 +45,12 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
private static final long JOB_WAITING_TIMEOUT_MILLS = 2000L;
- @Getter
- private final String jobId;
-
- @Getter(AccessLevel.PROTECTED)
- private final PipelineJobType jobType;
-
private final AtomicBoolean stopping = new AtomicBoolean(false);
private final AtomicReference<JobBootstrap> jobBootstrap = new
AtomicReference<>();
private final Map<Integer, PipelineTasksRunner> tasksRunners = new
ConcurrentHashMap<>();
- protected AbstractPipelineJob(final String jobId) {
- this.jobId = jobId;
- jobType = TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobId).getType());
- }
-
/**
* Is stopping.
*
@@ -107,16 +92,17 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
@Override
public final void stop() {
+ Optional<String> jobId =
tasksRunners.values().stream().findFirst().map(each ->
each.getJobItemContext().getJobId());
try {
stopping.set(true);
log.info("Stop tasks runner, jobId={}", jobId);
tasksRunners.values().forEach(PipelineTasksRunner::stop);
- awaitJobStopped(jobId);
+ jobId.ifPresent(this::awaitJobStopped);
if (null != jobBootstrap.get()) {
jobBootstrap.get().shutdown();
}
} finally {
- PipelineJobProgressPersistService.remove(jobId);
+ jobId.ifPresent(PipelineJobProgressPersistService::remove);
tasksRunners.values().stream().map(each ->
each.getJobItemContext().getJobProcessContext()).forEach(QuietlyCloser::close);
clean();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 2c83e033062..46044d11a27 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -37,10 +37,6 @@ import java.sql.SQLException;
@Slf4j
public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobItemContext> extends AbstractPipelineJob {
- protected AbstractSeparablePipelineJob(final String jobId) {
- super(jobId);
- }
-
@Override
public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 133ac166992..2e782170459 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -87,8 +87,7 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemConte
private final CDCJobPreparer jobPreparer;
- public CDCJob(final String jobId, final PipelineSink sink) {
- super(jobId);
+ public CDCJob(final PipelineSink sink) {
this.sink = sink;
jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
jobItemManager = new PipelineJobItemManager<>(new
CDCJobType().getYamlJobItemProgressSwapper());
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index eec99b71fbc..4d1409798c1 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -212,7 +212,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
* @param sink sink
*/
public void start(final String jobId, final PipelineSink sink) {
- CDCJob job = new CDCJob(jobId, sink);
+ CDCJob job = new CDCJob(sink);
PipelineJobRegistry.add(jobId, job);
enable(jobId);
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index df48a9815cc..5eedb8770e4 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -35,10 +35,6 @@ import java.util.Optional;
*/
public final class ConsistencyCheckJob extends
AbstractSeparablePipelineJob<ConsistencyCheckJobItemContext> {
- public ConsistencyCheckJob(final String jobId) {
- super(jobId);
- }
-
@Override
public ConsistencyCheckJobItemContext buildJobItemContext(final
ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
index b41748418a6..954e63fe640 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
@@ -37,7 +37,7 @@ public final class
ConsistencyCheckJobConfigurationChangedProcessor extends Abst
@Override
protected AbstractPipelineJob buildPipelineJob(final String jobId) {
- return new ConsistencyCheckJob(jobId);
+ return new ConsistencyCheckJob();
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 9ed45a30c28..56a4463de5a 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -74,8 +74,7 @@ public final class MigrationJob extends
AbstractSeparablePipelineJob<MigrationJo
// Shared by all sharding items
private final MigrationJobPreparer jobPreparer;
- public MigrationJob(final String jobId) {
- super(jobId);
+ public MigrationJob() {
jobItemManager = new PipelineJobItemManager<>(new
MigrationJobType().getYamlJobItemProgressSwapper());
processConfigPersistService = new
PipelineProcessConfigurationPersistService();
dataSourceManager = new DefaultPipelineDataSourceManager();
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index e8c2c0b0240..eb7266b676e 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -40,7 +40,7 @@ public final class MigrationJobConfigurationChangedProcessor
extends AbstractJob
@Override
protected AbstractPipelineJob buildPipelineJob(final String jobId) {
- return new MigrationJob(jobId);
+ return new MigrationJob();
}
@Override
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 3b91f6ffc0e..a1aa5dd694b 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -54,7 +54,7 @@ class ConsistencyCheckJobTest {
Map<String, Object> expectTableCheckPosition =
Collections.singletonMap("t_order", 100);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
- ConsistencyCheckJob consistencyCheckJob = new
ConsistencyCheckJob(checkJobId);
+ ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
ConsistencyCheckJobItemContext actual =
consistencyCheckJob.buildJobItemContext(
new ShardingContext(checkJobId, "", 1,
YamlEngine.marshal(createYamlConsistencyCheckJobConfiguration(checkJobId)), 0,
""));
assertThat(actual.getProgressContext().getSourceTableCheckPositions(),
is(expectTableCheckPosition));