This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 30bd2f72ad1 Improve job progress swapper (#20029)
30bd2f72ad1 is described below
commit 30bd2f72ad10fa82e447f18c021563eccd6acb99
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Aug 10 01:13:46 2022 +0800
Improve job progress swapper (#20029)
---
...YamlJobItemIncrementalTasksProgressSwapper.java | 6 ++---
.../YamlJobItemInventoryTasksProgressSwapper.java | 5 ++--
.../progress/yaml/YamlJobProgressSwapperTest.java | 30 +++++++++++++++-------
...no-inventory.yaml => job-progress-failure.yaml} | 8 +-----
4 files changed, 28 insertions(+), 21 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
index c2d7a467b53..b78115699a3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -35,7 +35,7 @@ public final class YamlJobItemIncrementalTasksProgressSwapper
{
*/
public YamlJobItemIncrementalTasksProgress swapToYaml(final
JobItemIncrementalTasksProgress progress) {
if (null == progress) {
- return null;
+ return new YamlJobItemIncrementalTasksProgress();
}
return progress.getIncrementalTaskProgressMap()
.entrySet().stream()
@@ -45,7 +45,7 @@ public final class YamlJobItemIncrementalTasksProgressSwapper
{
result.setPosition(entry.getValue().getPosition().toString());
result.setDelay(entry.getValue().getIncrementalTaskDelay());
return result;
- }).findAny().orElse(null);
+ }).findAny().orElse(new YamlJobItemIncrementalTasksProgress());
}
/**
@@ -57,7 +57,7 @@ public final class YamlJobItemIncrementalTasksProgressSwapper
{
*/
public JobItemIncrementalTasksProgress swapToObject(final String
databaseType, final YamlJobItemIncrementalTasksProgress yamlProgress) {
if (null == yamlProgress) {
- return null;
+ return new JobItemIncrementalTasksProgress(Collections.emptyMap());
}
IncrementalTaskProgress taskProgress = new IncrementalTaskProgress();
// TODO databaseType
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
index 2516af04b6a..add094b8604 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventory
import
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import java.util.Arrays;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;
@@ -43,7 +44,7 @@ public final class YamlJobItemInventoryTasksProgressSwapper {
*/
public YamlJobItemInventoryTasksProgress swapToYaml(final
JobItemInventoryTasksProgress progress) {
YamlJobItemInventoryTasksProgress result = new
YamlJobItemInventoryTasksProgress();
- if (progress != null) {
+ if (null != progress) {
result.setFinished(getFinished(progress));
result.setUnfinished(getUnfinished(progress));
}
@@ -70,7 +71,7 @@ public final class YamlJobItemInventoryTasksProgressSwapper {
*/
public JobItemInventoryTasksProgress swapToObject(final
YamlJobItemInventoryTasksProgress yamlProgress) {
if (null == yamlProgress) {
- return null;
+ return new JobItemInventoryTasksProgress(Collections.emptyMap());
}
Map<String, InventoryTaskProgress> taskProgressMap = new
LinkedHashMap<>();
taskProgressMap.putAll(Arrays.stream(yamlProgress.getFinished()).collect(Collectors.toMap(key
-> key, value -> new InventoryTaskProgress(new FinishedPosition()))));
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
index c385ac9a9f6..2cf2e59b4bd 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
@@ -24,19 +24,16 @@ import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
public final class YamlJobProgressSwapperTest {
private static final YamlJobProgressSwapper SWAPPER = new
YamlJobProgressSwapper();
- private JobProgress getJobProgress(final String data) {
- return SWAPPER.swapToObject(YamlEngine.unmarshal(data,
YamlJobProgress.class));
- }
-
@Test
public void assertFullSwapToYamlConfiguration() {
- JobProgress jobProgress =
getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml"));
+ JobProgress jobProgress =
SWAPPER.swapToObject(YamlEngine.unmarshal(ConfigurationFileUtil.readFile("job-progress.yaml"),
YamlJobProgress.class));
YamlJobProgress actual = SWAPPER.swapToYamlConfiguration(jobProgress);
assertThat(actual.getStatus(), is("RUNNING"));
assertThat(actual.getSourceDatabaseType(), is("H2"));
@@ -50,9 +47,24 @@ public final class YamlJobProgressSwapperTest {
}
@Test
- public void assertSwapToYamlConfigurationWithNullInventory() {
- JobProgress jobProgress =
getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-inventory.yaml"));
- YamlJobProgress actual = SWAPPER.swapToYamlConfiguration(jobProgress);
- assertThat(actual.getInventory().getFinished().length, is(0));
+ public void assertSwapWithFullConfig() {
+ YamlJobProgress yamlProgress =
YamlEngine.unmarshal(ConfigurationFileUtil.readFile("job-progress.yaml"),
YamlJobProgress.class);
+ YamlJobProgress actual =
SWAPPER.swapToYamlConfiguration(SWAPPER.swapToObject(yamlProgress));
+ assertThat(YamlEngine.marshal(actual),
is(YamlEngine.marshal(yamlProgress)));
+ }
+
+ @Test
+ public void assertSwapWithoutInventoryIncremental() {
+ YamlJobProgress yamlProgress =
YamlEngine.unmarshal(ConfigurationFileUtil.readFile("job-progress-failure.yaml"),
YamlJobProgress.class);
+ JobProgress progress = SWAPPER.swapToObject(yamlProgress);
+ assertNotNull(progress.getInventory());
+ assertNotNull(progress.getIncremental());
+ assertThat(progress.getInventory().getInventoryFinishedPercentage(),
is(0));
+ assertThat(progress.getIncremental().getDataSourceName(), is(""));
+
assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(),
is(0L));
+ YamlJobProgress actual = SWAPPER.swapToYamlConfiguration(progress);
+ assertNotNull(actual.getInventory());
+ assertNotNull(actual.getIncremental());
+ assertThat(YamlEngine.marshal(actual),
is(YamlEngine.marshal(yamlProgress)));
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
similarity index 85%
rename from
shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml
rename to
shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
index 897258a53ce..ce53fea4473 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-inventory.yaml
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-failure.yaml
@@ -15,11 +15,5 @@
# limitations under the License.
#
#
-incremental:
- dataSourceName: ds0
- delay:
- lastEventTimestamps: 0
- latestActiveTimeMillis: 0
- position: ''
sourceDatabaseType: H2
-status: RUNNING
+status: PREPARING_FAILURE