This is an automated email from the ASF dual-hosted git repository. sunnianjun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new e98467e43f5 Refactor TransmissionJobItemProgress (#32736) e98467e43f5 is described below commit e98467e43f52a80593d0a167523843298d88bd22 Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Fri Aug 30 19:39:17 2024 +0800 Refactor TransmissionJobItemProgress (#32736) * Refactor PipelineJobRunnerManager * Refactor TransmissionJobItemProgress --- .../core/job/AbstractSeparablePipelineJob.java | 2 +- .../core/job/progress/TransmissionJobItemProgress.java | 18 +++++++++--------- .../YamlTransmissionJobItemProgressSwapper.java | 12 +++++------- .../job/progress/PipelineJobProgressDetectorTest.java | 16 ++++++++-------- .../data/pipeline/cdc/api/CDCJobAPI.java | 7 ++----- 5 files changed, 25 insertions(+), 30 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java index 196034b3549..648665e70b7 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java @@ -124,7 +124,7 @@ public abstract class AbstractSeparablePipelineJob<T extends PipelineJobConfigur protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext); - protected final void prepare(final I jobItemContext) { + private void prepare(final I jobItemContext) { try { doPrepare(jobItemContext); // CHECKSTYLE:OFF diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java index 56c18fa144c..88876f0763a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java @@ -18,13 +18,13 @@ package org.apache.shardingsphere.data.pipeline.core.job.progress; import lombok.Getter; -import lombok.NoArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.Setter; import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.core.job.JobStatus; +import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress; import org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress; -import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import java.util.Collection; @@ -34,22 +34,22 @@ import java.util.Map; /** * Transmission job item progress. */ -@NoArgsConstructor +@RequiredArgsConstructor @Getter @Setter public final class TransmissionJobItemProgress implements PipelineJobItemProgress { - private DatabaseType sourceDatabaseType; + private final DatabaseType sourceDatabaseType; - private String dataSourceName; + private final String dataSourceName; - private JobItemInventoryTasksProgress inventory; + private final JobItemInventoryTasksProgress inventory; - private JobItemIncrementalTasksProgress incremental; + private final JobItemIncrementalTasksProgress incremental; - private long inventoryRecordsCount; + private final long inventoryRecordsCount; - private long processedRecordsCount; + private final long processedRecordsCount; private boolean active; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java index 43e63745c3a..4082d0dc487 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java @@ -47,14 +47,12 @@ public final class YamlTransmissionJobItemProgressSwapper implements YamlPipelin @Override public TransmissionJobItemProgress swapToObject(final YamlTransmissionJobItemProgress yamlProgress) { - TransmissionJobItemProgress result = new TransmissionJobItemProgress(); + TransmissionJobItemProgress result = new TransmissionJobItemProgress( + TypedSPILoader.getService(DatabaseType.class, yamlProgress.getSourceDatabaseType()), + yamlProgress.getDataSourceName(), inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()), + incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(), yamlProgress.getIncremental()), + yamlProgress.getInventoryRecordsCount(), yamlProgress.getProcessedRecordsCount()); result.setStatus(JobStatus.valueOf(yamlProgress.getStatus())); - result.setSourceDatabaseType(TypedSPILoader.getService(DatabaseType.class, yamlProgress.getSourceDatabaseType())); - result.setDataSourceName(yamlProgress.getDataSourceName()); - result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory())); - result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(), yamlProgress.getIncremental())); - result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount()); - result.setInventoryRecordsCount(yamlProgress.getInventoryRecordsCount()); return result; } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java index 9a3bfdd5971..850cf0d1991 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class PipelineJobProgressDetectorTest { @@ -67,31 +68,30 @@ class PipelineJobProgressDetectorTest { @Test void assertIsInventoryFinishedWhenJobCountDoesNotMatchJobItemProgresses() { - TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress(); - assertFalse(PipelineJobProgressDetector.isInventoryFinished(2, Collections.singleton(transmissionJobItemProgress))); + assertFalse(PipelineJobProgressDetector.isInventoryFinished(2, Collections.singleton(mock(TransmissionJobItemProgress.class)))); } @Test void assertIsInventoryFinishedWhenInventoryTaskProgressHasEmptyMap() { JobItemInventoryTasksProgress jobItemInventoryTasksProgress = new JobItemInventoryTasksProgress(Collections.emptyMap()); - TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress(); - transmissionJobItemProgress.setInventory(jobItemInventoryTasksProgress); + TransmissionJobItemProgress transmissionJobItemProgress = mock(TransmissionJobItemProgress.class); + when(transmissionJobItemProgress.getInventory()).thenReturn(jobItemInventoryTasksProgress); assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, Collections.singleton(transmissionJobItemProgress))); } @Test void assertIsInventoryFinishedWhenNotAllInventoryTasksCompleted() { JobItemInventoryTasksProgress inventoryTasksProgress = new JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new InventoryTaskProgress(new IngestPlaceholderPosition()))); - TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress(); - transmissionJobItemProgress.setInventory(inventoryTasksProgress); + TransmissionJobItemProgress transmissionJobItemProgress = mock(TransmissionJobItemProgress.class); + when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress); assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, Collections.singleton(transmissionJobItemProgress))); } @Test void assertIsInventoryFinished() { JobItemInventoryTasksProgress inventoryTasksProgress = new JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new InventoryTaskProgress(new IngestFinishedPosition()))); - TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress(); - transmissionJobItemProgress.setInventory(inventoryTasksProgress); + TransmissionJobItemProgress transmissionJobItemProgress = mock(TransmissionJobItemProgress.class); + when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress); assertTrue(PipelineJobProgressDetector.isInventoryFinished(1, Collections.singleton(transmissionJobItemProgress))); } } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java index 2e9d6187cf8..b8d5447ce3f 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java @@ -207,13 +207,10 @@ public final class CDCJobAPI implements TransmissionJobAPI { private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager, final IncrementalDumperContext incrementalDumperContext) throws SQLException { - TransmissionJobItemProgress result = new TransmissionJobItemProgress(); - result.setSourceDatabaseType(jobConfig.getSourceDatabaseType()); - result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName()); IncrementalTaskPositionManager positionManager = new IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()); IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(positionManager.getPosition(null, incrementalDumperContext, dataSourceManager)); - result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress)); - return result; + return new TransmissionJobItemProgress(jobConfig.getSourceDatabaseType(), incrementalDumperContext.getCommonContext().getDataSourceName(), null, + new JobItemIncrementalTasksProgress(incrementalTaskProgress), 0L, 0L); } /**