This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 85d38e41078 Remove usage of AbstractPipelineJob.jobType (#29343)
85d38e41078 is described below
commit 85d38e410788e014b8837d39ec83e37ceb7977a2
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 22:47:46 2023 +0800
Remove usage of AbstractPipelineJob.jobType (#29343)
* Refactor GenericSQLRewriteEngineTest
* Remove useless of AbstractPipelineJob.jobType
---
.../engine/GenericSQLRewriteEngineTest.java | 2 +-
.../core/job/AbstractInseparablePipelineJob.java | 22 +++++++++++-----------
.../core/job/AbstractSeparablePipelineJob.java | 6 +++---
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 4 ++--
.../consistencycheck/ConsistencyCheckJob.java | 4 +---
5 files changed, 18 insertions(+), 20 deletions(-)
diff --git
a/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/engine/GenericSQLRewriteEngineTest.java
b/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/engine/GenericSQLRewriteEngineTest.java
index a753669d530..fa1cff3f0e9 100644
---
a/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/engine/GenericSQLRewriteEngineTest.java
+++
b/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/engine/GenericSQLRewriteEngineTest.java
@@ -54,7 +54,7 @@ class GenericSQLRewriteEngineTest {
when(database.getResourceMetaData().getStorageUnits()).thenReturn(storageUnits);
CommonSQLStatementContext sqlStatementContext =
mock(CommonSQLStatementContext.class);
when(sqlStatementContext.getDatabaseType()).thenReturn(databaseType);
- QueryContext queryContext = mock(QueryContext.class);
+ QueryContext queryContext = mock(QueryContext.class,
RETURNS_DEEP_STUBS);
when(queryContext.getSqlStatementContext()).thenReturn(sqlStatementContext);
GenericSQLRewriteResult actual = new GenericSQLRewriteEngine(rule,
database, mock(RuleMetaData.class))
.rewrite(new SQLRewriteContext(database, sqlStatementContext,
"SELECT 1", Collections.emptyList(), mock(ConnectionContext.class),
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 9dcde50e8e2..1566479ee33 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfig
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -42,18 +43,16 @@ import java.util.concurrent.CompletableFuture;
@Slf4j
public abstract class AbstractInseparablePipelineJob<T extends
PipelineJobItemContext> extends AbstractPipelineJob {
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager;
-
protected AbstractInseparablePipelineJob(final String jobId) {
super(jobId);
- jobItemManager = new
PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
}
@Override
public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
log.info("Execute job {}", jobId);
- PipelineJobConfiguration jobConfig =
getJobType().getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+ PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
+ PipelineJobConfiguration jobConfig =
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Collection<T> jobItemContexts = new LinkedList<>();
for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
if (isStopping()) {
@@ -73,8 +72,8 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobItemCo
return;
}
prepare(jobItemContexts);
- executeInventoryTasks(jobItemContexts);
- executeIncrementalTasks(jobItemContexts);
+ executeInventoryTasks(jobType, jobItemContexts);
+ executeIncrementalTasks(jobType, jobItemContexts);
}
protected abstract T buildJobItemContext(PipelineJobConfiguration
jobConfig, int shardingItem);
@@ -105,10 +104,10 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobItemCo
protected abstract void processFailed(String jobId);
- private void executeInventoryTasks(final Collection<T> jobItemContexts) {
+ private void executeInventoryTasks(final PipelineJobType jobType, final
Collection<T> jobItemContexts) {
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (T each : jobItemContexts) {
- updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK);
+ updateJobItemStatus(each, jobType,
JobStatus.EXECUTE_INVENTORY_TASK);
for (PipelineTask task : ((TransmissionJobItemContext)
each).getInventoryTasks()) {
if (task.getTaskProgress().getPosition() instanceof
FinishedPosition) {
continue;
@@ -124,12 +123,13 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobItemCo
protected abstract void
executeInventoryTasks(Collection<CompletableFuture<?>> futures, Collection<T>
jobItemContexts);
- private void updateJobItemStatus(final T jobItemContext, final JobStatus
jobStatus) {
+ private void updateJobItemStatus(final T jobItemContext, final
PipelineJobType jobType, final JobStatus jobStatus) {
jobItemContext.setStatus(jobStatus);
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
}
- private void executeIncrementalTasks(final Collection<T> jobItemContexts) {
+ private void executeIncrementalTasks(final PipelineJobType jobType, final
Collection<T> jobItemContexts) {
log.info("Execute incremental tasks, jobId={}", getJobId());
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (T each : jobItemContexts) {
@@ -137,7 +137,7 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobItemCo
log.info("job status already EXECUTE_INCREMENTAL_TASK,
ignore");
return;
}
- updateJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK);
+ updateJobItemStatus(each, jobType,
JobStatus.EXECUTE_INCREMENTAL_TASK);
for (PipelineTask task : ((TransmissionJobItemContext)
each).getIncrementalTasks()) {
if (task.getTaskProgress().getPosition() instanceof
FinishedPosition) {
continue;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index ac5e44b1fe0..2c83e033062 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -55,7 +55,7 @@ public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobItemCont
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
- processFailed(new PipelineJobManager(getJobType()), jobId,
shardingItem, ex);
+ processFailed(jobId, shardingItem, ex);
throw ex;
}
}
@@ -89,11 +89,11 @@ public abstract class AbstractSeparablePipelineJob<T
extends PipelineJobItemCont
protected abstract void doPrepare(T jobItemContext) throws SQLException;
- private void processFailed(final PipelineJobManager jobManager, final
String jobId, final int shardingItem, final Exception ex) {
+ private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
try {
- jobManager.stop(jobId);
+ new
PipelineJobManager(PipelineJobIdUtils.parseJobType(jobId)).stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index a9854bfd848..133ac166992 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -91,7 +91,7 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemConte
super(jobId);
this.sink = sink;
jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
- jobItemManager = new
PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
+ jobItemManager = new PipelineJobItemManager<>(new
CDCJobType().getYamlJobItemProgressSwapper());
processConfigPersistService = new
PipelineProcessConfigurationPersistService();
dataSourceManager = new DefaultPipelineDataSourceManager();
jobPreparer = new CDCJobPreparer();
@@ -101,7 +101,7 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemConte
protected CDCJobItemContext buildJobItemContext(final
PipelineJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(
-
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
getJobType().getType()));
+
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
"STREAMING"));
TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
CDCTaskConfiguration taskConfig =
buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext((CDCJobConfiguration) jobConfig,
shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig,
dataSourceManager, sink);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 61aa3e775fc..df48a9815cc 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
-import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
@@ -34,7 +33,6 @@ import java.util.Optional;
/**
* Consistency check job.
*/
-@Slf4j
public final class ConsistencyCheckJob extends
AbstractSeparablePipelineJob<ConsistencyCheckJobItemContext> {
public ConsistencyCheckJob(final String jobId) {
@@ -44,7 +42,7 @@ public final class ConsistencyCheckJob extends
AbstractSeparablePipelineJob<Cons
@Override
public ConsistencyCheckJobItemContext buildJobItemContext(final
ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
+ PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(new
ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobItemManager.getProgress(jobConfig.getJobId(),
shardingContext.getShardingItem());
return new ConsistencyCheckJobItemContext(jobConfig,
shardingContext.getShardingItem(), JobStatus.RUNNING,
jobItemProgress.orElse(null));
}