This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 85d38e41078 Remove usage of AbstractPipelineJob.jobType (#29343)
85d38e41078 is described below

commit 85d38e410788e014b8837d39ec83e37ceb7977a2
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 22:47:46 2023 +0800

    Remove usage of AbstractPipelineJob.jobType (#29343)
    
    * Refactor GenericSQLRewriteEngineTest
    
    * Remove useless of AbstractPipelineJob.jobType
---
 .../engine/GenericSQLRewriteEngineTest.java        |  2 +-
 .../core/job/AbstractInseparablePipelineJob.java   | 22 +++++++++++-----------
 .../core/job/AbstractSeparablePipelineJob.java     |  6 +++---
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   |  4 ++--
 .../consistencycheck/ConsistencyCheckJob.java      |  4 +---
 5 files changed, 18 insertions(+), 20 deletions(-)

diff --git 
a/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/engine/GenericSQLRewriteEngineTest.java
 
b/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/engine/GenericSQLRewriteEngineTest.java
index a753669d530..fa1cff3f0e9 100644
--- 
a/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/engine/GenericSQLRewriteEngineTest.java
+++ 
b/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/engine/GenericSQLRewriteEngineTest.java
@@ -54,7 +54,7 @@ class GenericSQLRewriteEngineTest {
         
when(database.getResourceMetaData().getStorageUnits()).thenReturn(storageUnits);
         CommonSQLStatementContext sqlStatementContext = 
mock(CommonSQLStatementContext.class);
         when(sqlStatementContext.getDatabaseType()).thenReturn(databaseType);
-        QueryContext queryContext = mock(QueryContext.class);
+        QueryContext queryContext = mock(QueryContext.class, 
RETURNS_DEEP_STUBS);
         
when(queryContext.getSqlStatementContext()).thenReturn(sqlStatementContext);
         GenericSQLRewriteResult actual = new GenericSQLRewriteEngine(rule, 
database, mock(RuleMetaData.class))
                 .rewrite(new SQLRewriteContext(database, sqlStatementContext, 
"SELECT 1", Collections.emptyList(), mock(ConnectionContext.class),
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 9dcde50e8e2..1566479ee33 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfig
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -42,18 +43,16 @@ import java.util.concurrent.CompletableFuture;
 @Slf4j
 public abstract class AbstractInseparablePipelineJob<T extends 
PipelineJobItemContext> extends AbstractPipelineJob {
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager;
-    
     protected AbstractInseparablePipelineJob(final String jobId) {
         super(jobId);
-        jobItemManager = new 
PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
     }
     
     @Override
     public final void execute(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
         log.info("Execute job {}", jobId);
-        PipelineJobConfiguration jobConfig = 
getJobType().getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+        PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
+        PipelineJobConfiguration jobConfig = 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         Collection<T> jobItemContexts = new LinkedList<>();
         for (int shardingItem = 0; shardingItem < 
jobConfig.getJobShardingCount(); shardingItem++) {
             if (isStopping()) {
@@ -73,8 +72,8 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobItemCo
             return;
         }
         prepare(jobItemContexts);
-        executeInventoryTasks(jobItemContexts);
-        executeIncrementalTasks(jobItemContexts);
+        executeInventoryTasks(jobType, jobItemContexts);
+        executeIncrementalTasks(jobType, jobItemContexts);
     }
     
     protected abstract T buildJobItemContext(PipelineJobConfiguration 
jobConfig, int shardingItem);
@@ -105,10 +104,10 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobItemCo
     
     protected abstract void processFailed(String jobId);
     
-    private void executeInventoryTasks(final Collection<T> jobItemContexts) {
+    private void executeInventoryTasks(final PipelineJobType jobType, final 
Collection<T> jobItemContexts) {
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
         for (T each : jobItemContexts) {
-            updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK);
+            updateJobItemStatus(each, jobType, 
JobStatus.EXECUTE_INVENTORY_TASK);
             for (PipelineTask task : ((TransmissionJobItemContext) 
each).getInventoryTasks()) {
                 if (task.getTaskProgress().getPosition() instanceof 
FinishedPosition) {
                     continue;
@@ -124,12 +123,13 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobItemCo
     
     protected abstract void 
executeInventoryTasks(Collection<CompletableFuture<?>> futures, Collection<T> 
jobItemContexts);
     
-    private void updateJobItemStatus(final T jobItemContext, final JobStatus 
jobStatus) {
+    private void updateJobItemStatus(final T jobItemContext, final 
PipelineJobType jobType, final JobStatus jobStatus) {
         jobItemContext.setStatus(jobStatus);
+        PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
         jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
     }
     
-    private void executeIncrementalTasks(final Collection<T> jobItemContexts) {
+    private void executeIncrementalTasks(final PipelineJobType jobType, final 
Collection<T> jobItemContexts) {
         log.info("Execute incremental tasks, jobId={}", getJobId());
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
         for (T each : jobItemContexts) {
@@ -137,7 +137,7 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobItemCo
                 log.info("job status already EXECUTE_INCREMENTAL_TASK, 
ignore");
                 return;
             }
-            updateJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK);
+            updateJobItemStatus(each, jobType, 
JobStatus.EXECUTE_INCREMENTAL_TASK);
             for (PipelineTask task : ((TransmissionJobItemContext) 
each).getIncrementalTasks()) {
                 if (task.getTaskProgress().getPosition() instanceof 
FinishedPosition) {
                     continue;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index ac5e44b1fe0..2c83e033062 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -55,7 +55,7 @@ public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobItemCont
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
-            processFailed(new PipelineJobManager(getJobType()), jobId, 
shardingItem, ex);
+            processFailed(jobId, shardingItem, ex);
             throw ex;
         }
     }
@@ -89,11 +89,11 @@ public abstract class AbstractSeparablePipelineJob<T 
extends PipelineJobItemCont
     
     protected abstract void doPrepare(T jobItemContext) throws SQLException;
     
-    private void processFailed(final PipelineJobManager jobManager, final 
String jobId, final int shardingItem, final Exception ex) {
+    private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
         log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
         try {
-            jobManager.stop(jobId);
+            new 
PipelineJobManager(PipelineJobIdUtils.parseJobType(jobId)).stop(jobId);
         } catch (final PipelineJobNotFoundException ignored) {
         }
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index a9854bfd848..133ac166992 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -91,7 +91,7 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemConte
         super(jobId);
         this.sink = sink;
         jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
-        jobItemManager = new 
PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
+        jobItemManager = new PipelineJobItemManager<>(new 
CDCJobType().getYamlJobItemProgressSwapper());
         processConfigPersistService = new 
PipelineProcessConfigurationPersistService();
         dataSourceManager = new DefaultPipelineDataSourceManager();
         jobPreparer = new CDCJobPreparer();
@@ -101,7 +101,7 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemConte
     protected CDCJobItemContext buildJobItemContext(final 
PipelineJobConfiguration jobConfig, final int shardingItem) {
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
         PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(
-                
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 getJobType().getType()));
+                
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 "STREAMING"));
         TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         CDCTaskConfiguration taskConfig = 
buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         return new CDCJobItemContext((CDCJobConfiguration) jobConfig, 
shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, 
dataSourceManager, sink);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 61aa3e775fc..df48a9815cc 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
-import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
@@ -34,7 +33,6 @@ import java.util.Optional;
 /**
  * Consistency check job.
  */
-@Slf4j
 public final class ConsistencyCheckJob extends 
AbstractSeparablePipelineJob<ConsistencyCheckJobItemContext> {
     
     public ConsistencyCheckJob(final String jobId) {
@@ -44,7 +42,7 @@ public final class ConsistencyCheckJob extends 
AbstractSeparablePipelineJob<Cons
     @Override
     public ConsistencyCheckJobItemContext buildJobItemContext(final 
ShardingContext shardingContext) {
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
+        PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(new 
ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
         Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), 
shardingContext.getShardingItem());
         return new ConsistencyCheckJobItemContext(jobConfig, 
shardingContext.getShardingItem(), JobStatus.RUNNING, 
jobItemProgress.orElse(null));
     }

Reply via email to