This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d2365c1a72c Refactor AbstractPipelineJob.jobId as final (#25647)
d2365c1a72c is described below
commit d2365c1a72c646f804136d4d1c18f7bb542dffcd
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat May 13 23:38:45 2023 +0800
Refactor AbstractPipelineJob.jobId as final (#25647)
---
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 2 +-
.../data/pipeline/cdc/core/job/CDCJob.java | 7 +++++--
.../data/pipeline/core/job/AbstractPipelineJob.java | 16 ++++++----------
.../pipeline/core/job/AbstractSimplePipelineJob.java | 5 ++++-
.../impl/AbstractChangedJobConfigurationProcessor.java | 4 ++--
.../scenario/consistencycheck/ConsistencyCheckJob.java | 4 ++++
...ChangedConsistencyCheckJobConfigurationProcessor.java | 4 ++--
.../data/pipeline/scenario/migration/MigrationJob.java | 6 ++++--
.../ChangedMigrationJobConfigurationProcessor.java | 4 ++--
.../consistencycheck/ConsistencyCheckJobTest.java | 5 +----
10 files changed, 31 insertions(+), 26 deletions(-)
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 6787dcfbe21..5ffd47a2619 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -226,7 +226,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
* @param importerConnector importer connector
*/
public void startJob(final String jobId, final ImporterConnector
importerConnector) {
- CDCJob job = new CDCJob(importerConnector);
+ CDCJob job = new CDCJob(jobId, importerConnector);
PipelineJobCenter.addJob(jobId, job);
updateJobConfigurationDisabled(jobId, false);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 62e8fe62a64..ee6ea603e61 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.job;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -43,7 +42,6 @@ import java.util.Optional;
/**
* CDC job.
*/
-@RequiredArgsConstructor
@Slf4j
public final class CDCJob extends AbstractSimplePipelineJob {
@@ -55,6 +53,11 @@ public final class CDCJob extends AbstractSimplePipelineJob {
private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ public CDCJob(final String jobId, final ImporterConnector
importerConnector) {
+ super(jobId);
+ this.importerConnector = importerConnector;
+ }
+
@Override
protected void doPrepare(final PipelineJobItemContext jobItemContext) {
jobPreparer.initTasks((CDCJobItemContext) jobItemContext);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index cf6a41cb263..adb71754739 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -49,10 +49,10 @@ import java.util.concurrent.TimeUnit;
public abstract class AbstractPipelineJob implements PipelineJob {
@Getter
- private volatile String jobId;
+ private final String jobId;
@Getter(AccessLevel.PROTECTED)
- private volatile PipelineJobAPI jobAPI;
+ private final PipelineJobAPI jobAPI;
@Getter
private volatile boolean stopping;
@@ -62,7 +62,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new
ConcurrentHashMap<>();
- protected void setJobId(final String jobId) {
+ protected AbstractPipelineJob(final String jobId) {
this.jobId = jobId;
jobAPI = TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobId).getTypeName());
}
@@ -129,10 +129,8 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
for (PipelineTasksRunner each : tasksRunnerMap.values()) {
each.stop();
}
- if (null != jobId) {
- Optional<ElasticJobListener> pipelineJobListener =
ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class,
PipelineElasticJobListener.class.getName());
- pipelineJobListener.ifPresent(jobListener ->
awaitJobStopped((PipelineElasticJobListener) jobListener, jobId,
TimeUnit.SECONDS.toMillis(2)));
- }
+ Optional<ElasticJobListener> pipelineJobListener =
ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class,
PipelineElasticJobListener.class.getName());
+ pipelineJobListener.ifPresent(jobListener ->
awaitJobStopped((PipelineElasticJobListener) jobListener, jobId,
TimeUnit.SECONDS.toMillis(2)));
if (null != jobBootstrap) {
jobBootstrap.shutdown();
}
@@ -157,9 +155,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
private void innerClean() {
tasksRunnerMap.clear();
- if (null != jobId) {
-
PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
- }
+
PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
}
protected abstract void doClean();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 02477826d6d..2aa8225d639 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -29,6 +29,10 @@ import
org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@Slf4j
public abstract class AbstractSimplePipelineJob extends AbstractPipelineJob
implements SimpleJob {
+ protected AbstractSimplePipelineJob(final String jobId) {
+ super(jobId);
+ }
+
/**
* Build pipeline job item context.
*
@@ -48,7 +52,6 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
log.info("stopping true, ignore");
return;
}
- setJobId(jobId);
PipelineJobItemContext jobItemContext =
buildPipelineJobItemContext(shardingContext);
PipelineTasksRunner tasksRunner =
buildPipelineTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
index 223a73b2aa8..d48001ab41a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
@@ -79,15 +79,15 @@ public abstract class
AbstractChangedJobConfigurationProcessor implements Change
protected abstract void onDeleted(JobConfiguration jobConfig);
protected void executeJob(final JobConfiguration jobConfig) {
- AbstractPipelineJob job = buildPipelineJob();
String jobId = jobConfig.getJobName();
+ AbstractPipelineJob job = buildPipelineJob(jobId);
PipelineJobCenter.addJob(jobId, job);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
job, jobConfig);
job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
}
- protected abstract AbstractPipelineJob buildPipelineJob();
+ protected abstract AbstractPipelineJob buildPipelineJob(String jobId);
protected abstract JobType getJobType();
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 3bb0cb7909e..e04b02a0c17 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -38,6 +38,10 @@ import java.util.Optional;
@Slf4j
public final class ConsistencyCheckJob extends AbstractSimplePipelineJob {
+ public ConsistencyCheckJob(final String jobId) {
+ super(jobId);
+ }
+
@Override
public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final
ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ChangedConsistencyCheckJobConfigurationProcessor.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ChangedConsistencyCheckJobConfigurationProcessor.java
index 5d418989f7f..a6039aed8ef 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ChangedConsistencyCheckJobConfigurationProcessor.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ChangedConsistencyCheckJobConfigurationProcessor.java
@@ -36,8 +36,8 @@ public final class
ChangedConsistencyCheckJobConfigurationProcessor extends Abst
}
@Override
- protected AbstractPipelineJob buildPipelineJob() {
- return new ConsistencyCheckJob();
+ protected AbstractPipelineJob buildPipelineJob(final String jobId) {
+ return new ConsistencyCheckJob(jobId);
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index be4c0a586b8..ff598d5142f 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
@@ -42,7 +41,6 @@ import java.util.Optional;
/**
* Migration job.
*/
-@RequiredArgsConstructor
@Slf4j
public final class MigrationJob extends AbstractSimplePipelineJob {
@@ -53,6 +51,10 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
// Shared by all sharding items
private final MigrationJobPreparer jobPreparer = new
MigrationJobPreparer();
+ public MigrationJob(final String jobId) {
+ super(jobId);
+ }
+
@Override
protected InventoryIncrementalJobItemContext
buildPipelineJobItemContext(final ShardingContext shardingContext) {
int shardingItem = shardingContext.getShardingItem();
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/ChangedMigrationJobConfigurationProcessor.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/ChangedMigrationJobConfigurationProcessor.java
index 7937dc8b338..de16d1ad589 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/ChangedMigrationJobConfigurationProcessor.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/ChangedMigrationJobConfigurationProcessor.java
@@ -39,8 +39,8 @@ public final class ChangedMigrationJobConfigurationProcessor
extends AbstractCha
}
@Override
- protected AbstractPipelineJob buildPipelineJob() {
- return new MigrationJob();
+ protected AbstractPipelineJob buildPipelineJob(final String jobId) {
+ return new MigrationJob(jobId);
}
@Override
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 4e8969ae1f5..0dcd41a9279 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencychec
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
@@ -33,7 +32,6 @@ import
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfiguratio
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.mockito.internal.configuration.plugins.Plugins;
import java.util.Collections;
import java.util.Map;
@@ -55,8 +53,7 @@ class ConsistencyCheckJobTest {
Map<String, Object> expectTableCheckPosition =
Collections.singletonMap("t_order", 100);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(checkJobId,
0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
- ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
-
Plugins.getMemberAccessor().invoke(AbstractPipelineJob.class.getDeclaredMethod("setJobId",
String.class), consistencyCheckJob, checkJobId);
+ ConsistencyCheckJob consistencyCheckJob = new
ConsistencyCheckJob(checkJobId);
ConsistencyCheckJobItemContext actual =
consistencyCheckJob.buildPipelineJobItemContext(
new ShardingContext(checkJobId, "", 1,
YamlEngine.marshal(createYamlConsistencyCheckJobConfiguration(checkJobId)), 0,
""));
assertThat(actual.getProgressContext().getTableCheckPositions(),
is(expectTableCheckPosition));