This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e98467e43f5 Refactor TransmissionJobItemProgress (#32736)
e98467e43f5 is described below

commit e98467e43f52a80593d0a167523843298d88bd22
Author: Liang Zhang <zhangli...@apache.org>
AuthorDate: Fri Aug 30 19:39:17 2024 +0800

    Refactor TransmissionJobItemProgress (#32736)
    
    * Refactor PipelineJobRunnerManager
    
    * Refactor TransmissionJobItemProgress
---
 .../core/job/AbstractSeparablePipelineJob.java         |  2 +-
 .../core/job/progress/TransmissionJobItemProgress.java | 18 +++++++++---------
 .../YamlTransmissionJobItemProgressSwapper.java        | 12 +++++-------
 .../job/progress/PipelineJobProgressDetectorTest.java  | 16 ++++++++--------
 .../data/pipeline/cdc/api/CDCJobAPI.java               |  7 ++-----
 5 files changed, 25 insertions(+), 30 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 196034b3549..648665e70b7 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -124,7 +124,7 @@ public abstract class AbstractSeparablePipelineJob<T 
extends PipelineJobConfigur
     
     protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
     
-    protected final void prepare(final I jobItemContext) {
+    private void prepare(final I jobItemContext) {
         try {
             doPrepare(jobItemContext);
             // CHECKSTYLE:OFF
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
index 56c18fa144c..88876f0763a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
@@ -18,13 +18,13 @@
 package org.apache.shardingsphere.data.pipeline.core.job.progress;
 
 import lombok.Getter;
-import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress;
-import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 import java.util.Collection;
@@ -34,22 +34,22 @@ import java.util.Map;
 /**
  * Transmission job item progress.
  */
-@NoArgsConstructor
+@RequiredArgsConstructor
 @Getter
 @Setter
 public final class TransmissionJobItemProgress implements 
PipelineJobItemProgress {
     
-    private DatabaseType sourceDatabaseType;
+    private final DatabaseType sourceDatabaseType;
     
-    private String dataSourceName;
+    private final String dataSourceName;
     
-    private JobItemInventoryTasksProgress inventory;
+    private final JobItemInventoryTasksProgress inventory;
     
-    private JobItemIncrementalTasksProgress incremental;
+    private final JobItemIncrementalTasksProgress incremental;
     
-    private long inventoryRecordsCount;
+    private final long inventoryRecordsCount;
     
-    private long processedRecordsCount;
+    private final long processedRecordsCount;
     
     private boolean active;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
index 43e63745c3a..4082d0dc487 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
@@ -47,14 +47,12 @@ public final class YamlTransmissionJobItemProgressSwapper 
implements YamlPipelin
     
     @Override
     public TransmissionJobItemProgress swapToObject(final 
YamlTransmissionJobItemProgress yamlProgress) {
-        TransmissionJobItemProgress result = new TransmissionJobItemProgress();
+        TransmissionJobItemProgress result = new TransmissionJobItemProgress(
+                TypedSPILoader.getService(DatabaseType.class, 
yamlProgress.getSourceDatabaseType()),
+                yamlProgress.getDataSourceName(), 
inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()),
+                
incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(),
 yamlProgress.getIncremental()),
+                yamlProgress.getInventoryRecordsCount(), 
yamlProgress.getProcessedRecordsCount());
         result.setStatus(JobStatus.valueOf(yamlProgress.getStatus()));
-        
result.setSourceDatabaseType(TypedSPILoader.getService(DatabaseType.class, 
yamlProgress.getSourceDatabaseType()));
-        result.setDataSourceName(yamlProgress.getDataSourceName());
-        
result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()));
-        
result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(),
 yamlProgress.getIncremental()));
-        
result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount());
-        
result.setInventoryRecordsCount(yamlProgress.getInventoryRecordsCount());
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
index 9a3bfdd5971..850cf0d1991 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 class PipelineJobProgressDetectorTest {
     
@@ -67,31 +68,30 @@ class PipelineJobProgressDetectorTest {
     
     @Test
     void assertIsInventoryFinishedWhenJobCountDoesNotMatchJobItemProgresses() {
-        TransmissionJobItemProgress transmissionJobItemProgress = new 
TransmissionJobItemProgress();
-        assertFalse(PipelineJobProgressDetector.isInventoryFinished(2, 
Collections.singleton(transmissionJobItemProgress)));
+        assertFalse(PipelineJobProgressDetector.isInventoryFinished(2, 
Collections.singleton(mock(TransmissionJobItemProgress.class))));
     }
     
     @Test
     void assertIsInventoryFinishedWhenInventoryTaskProgressHasEmptyMap() {
         JobItemInventoryTasksProgress jobItemInventoryTasksProgress = new 
JobItemInventoryTasksProgress(Collections.emptyMap());
-        TransmissionJobItemProgress transmissionJobItemProgress = new 
TransmissionJobItemProgress();
-        
transmissionJobItemProgress.setInventory(jobItemInventoryTasksProgress);
+        TransmissionJobItemProgress transmissionJobItemProgress = 
mock(TransmissionJobItemProgress.class);
+        
when(transmissionJobItemProgress.getInventory()).thenReturn(jobItemInventoryTasksProgress);
         assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, 
Collections.singleton(transmissionJobItemProgress)));
     }
     
     @Test
     void assertIsInventoryFinishedWhenNotAllInventoryTasksCompleted() {
         JobItemInventoryTasksProgress inventoryTasksProgress = new 
JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new 
InventoryTaskProgress(new IngestPlaceholderPosition())));
-        TransmissionJobItemProgress transmissionJobItemProgress = new 
TransmissionJobItemProgress();
-        transmissionJobItemProgress.setInventory(inventoryTasksProgress);
+        TransmissionJobItemProgress transmissionJobItemProgress = 
mock(TransmissionJobItemProgress.class);
+        
when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress);
         assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, 
Collections.singleton(transmissionJobItemProgress)));
     }
     
     @Test
     void assertIsInventoryFinished() {
         JobItemInventoryTasksProgress inventoryTasksProgress = new 
JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new 
InventoryTaskProgress(new IngestFinishedPosition())));
-        TransmissionJobItemProgress transmissionJobItemProgress = new 
TransmissionJobItemProgress();
-        transmissionJobItemProgress.setInventory(inventoryTasksProgress);
+        TransmissionJobItemProgress transmissionJobItemProgress = 
mock(TransmissionJobItemProgress.class);
+        
when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress);
         assertTrue(PipelineJobProgressDetector.isInventoryFinished(1, 
Collections.singleton(transmissionJobItemProgress)));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 2e9d6187cf8..b8d5447ce3f 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -207,13 +207,10 @@ public final class CDCJobAPI implements 
TransmissionJobAPI {
     
     private TransmissionJobItemProgress getTransmissionJobItemProgress(final 
CDCJobConfiguration jobConfig, final PipelineDataSourceManager 
dataSourceManager,
                                                                        final 
IncrementalDumperContext incrementalDumperContext) throws SQLException {
-        TransmissionJobItemProgress result = new TransmissionJobItemProgress();
-        result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
-        
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
         IncrementalTaskPositionManager positionManager = new 
IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType());
         IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(positionManager.getPosition(null, 
incrementalDumperContext, dataSourceManager));
-        result.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));
-        return result;
+        return new 
TransmissionJobItemProgress(jobConfig.getSourceDatabaseType(), 
incrementalDumperContext.getCommonContext().getDataSourceName(), null,
+                new JobItemIncrementalTasksProgress(incrementalTaskProgress), 
0L, 0L);
     }
     
     /**

Reply via email to