This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 30bd2f72ad1 Improve job progress swapper (#20029)
30bd2f72ad1 is described below

commit 30bd2f72ad10fa82e447f18c021563eccd6acb99
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Aug 10 01:13:46 2022 +0800

    Improve job progress swapper (#20029)
---
 ...YamlJobItemIncrementalTasksProgressSwapper.java |  6 ++---
 .../YamlJobItemInventoryTasksProgressSwapper.java  |  5 ++--
 .../progress/yaml/YamlJobProgressSwapperTest.java  | 30 +++++++++++++++-------
 ...no-inventory.yaml => job-progress-failure.yaml} |  8 +-----
 4 files changed, 28 insertions(+), 21 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
index c2d7a467b53..b78115699a3 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -35,7 +35,7 @@ public final class YamlJobItemIncrementalTasksProgressSwapper 
{
      */
     public YamlJobItemIncrementalTasksProgress swapToYaml(final 
JobItemIncrementalTasksProgress progress) {
         if (null == progress) {
-            return null;
+            return new YamlJobItemIncrementalTasksProgress();
         }
         return progress.getIncrementalTaskProgressMap()
                 .entrySet().stream()
@@ -45,7 +45,7 @@ public final class YamlJobItemIncrementalTasksProgressSwapper 
{
                     
result.setPosition(entry.getValue().getPosition().toString());
                     
result.setDelay(entry.getValue().getIncrementalTaskDelay());
                     return result;
-                }).findAny().orElse(null);
+                }).findAny().orElse(new YamlJobItemIncrementalTasksProgress());
     }
     
     /**
@@ -57,7 +57,7 @@ public final class YamlJobItemIncrementalTasksProgressSwapper 
{
      */
     public JobItemIncrementalTasksProgress swapToObject(final String 
databaseType, final YamlJobItemIncrementalTasksProgress yamlProgress) {
         if (null == yamlProgress) {
-            return null;
+            return new JobItemIncrementalTasksProgress(Collections.emptyMap());
         }
         IncrementalTaskProgress taskProgress = new IncrementalTaskProgress();
         // TODO databaseType
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
index 2516af04b6a..add094b8604 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventory
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.function.Function;
@@ -43,7 +44,7 @@ public final class YamlJobItemInventoryTasksProgressSwapper {
      */
     public YamlJobItemInventoryTasksProgress swapToYaml(final 
JobItemInventoryTasksProgress progress) {
         YamlJobItemInventoryTasksProgress result = new 
YamlJobItemInventoryTasksProgress();
-        if (progress != null) {
+        if (null != progress) {
             result.setFinished(getFinished(progress));
             result.setUnfinished(getUnfinished(progress));
         }
@@ -70,7 +71,7 @@ public final class YamlJobItemInventoryTasksProgressSwapper {
      */
     public JobItemInventoryTasksProgress swapToObject(final 
YamlJobItemInventoryTasksProgress yamlProgress) {
         if (null == yamlProgress) {
-            return null;
+            return new JobItemInventoryTasksProgress(Collections.emptyMap());
         }
         Map<String, InventoryTaskProgress> taskProgressMap = new 
LinkedHashMap<>();
         
taskProgressMap.putAll(Arrays.stream(yamlProgress.getFinished()).collect(Collectors.toMap(key
 -> key, value -> new InventoryTaskProgress(new FinishedPosition()))));
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
index c385ac9a9f6..2cf2e59b4bd 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
@@ -24,19 +24,16 @@ import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
 public final class YamlJobProgressSwapperTest {
     
     private static final YamlJobProgressSwapper SWAPPER = new 
YamlJobProgressSwapper();
     
-    private JobProgress getJobProgress(final String data) {
-        return SWAPPER.swapToObject(YamlEngine.unmarshal(data, 
YamlJobProgress.class));
-    }
-    
     @Test
     public void assertFullSwapToYamlConfiguration() {
-        JobProgress jobProgress = 
getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml"));
+        JobProgress jobProgress = 
SWAPPER.swapToObject(YamlEngine.unmarshal(ConfigurationFileUtil.readFile("job-progress.yaml"),
 YamlJobProgress.class));
         YamlJobProgress actual = SWAPPER.swapToYamlConfiguration(jobProgress);
         assertThat(actual.getStatus(), is("RUNNING"));
         assertThat(actual.getSourceDatabaseType(), is("H2"));
@@ -50,9 +47,24 @@ public final class YamlJobProgressSwapperTest {
     }
     
     @Test
-    public void assertSwapToYamlConfigurationWithNullInventory() {
-        JobProgress jobProgress = 
getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-inventory.yaml"));
-        YamlJobProgress actual = SWAPPER.swapToYamlConfiguration(jobProgress);
-        assertThat(actual.getInventory().getFinished().length, is(0));
+    public void assertSwapWithFullConfig() {
+        YamlJobProgress yamlProgress = 
YamlEngine.unmarshal(ConfigurationFileUtil.readFile("job-progress.yaml"), 
YamlJobProgress.class);
+        YamlJobProgress actual = 
SWAPPER.swapToYamlConfiguration(SWAPPER.swapToObject(yamlProgress));
+        assertThat(YamlEngine.marshal(actual), 
is(YamlEngine.marshal(yamlProgress)));
+    }
+    
+    @Test
+    public void assertSwapWithoutInventoryIncremental() {
+        YamlJobProgress yamlProgress = 
YamlEngine.unmarshal(ConfigurationFileUtil.readFile("job-progress-failure.yaml"),
 YamlJobProgress.class);
+        JobProgress progress = SWAPPER.swapToObject(yamlProgress);
+        assertNotNull(progress.getInventory());
+        assertNotNull(progress.getIncremental());
+        assertThat(progress.getInventory().getInventoryFinishedPercentage(), 
is(0));
+        assertThat(progress.getIncremental().getDataSourceName(), is(""));
+        
assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(), 
is(0L));
+        YamlJobProgress actual = SWAPPER.swapToYamlConfiguration(progress);
+        assertNotNull(actual.getInventory());
+        assertNotNull(actual.getIncremental());
+        assertThat(YamlEngine.marshal(actual), 
is(YamlEngine.marshal(yamlProgress)));
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
similarity index 85%
rename from 
shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
index 897258a53ce..ce53fea4473 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
@@ -15,11 +15,5 @@
 # limitations under the License.
 #
 #
-incremental:
-  dataSourceName: ds0
-  delay:
-    lastEventTimestamps: 0
-    latestActiveTimeMillis: 0
-  position: ''
 sourceDatabaseType: H2
-status: RUNNING
+status: PREPARING_FAILURE

Reply via email to