This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5e3f259a8b4 Fix get inventory position not correctly at Pipeline
(#28356)
5e3f259a8b4 is described below
commit 5e3f259a8b49d9fa57aaedf871a7f7d7fd62d7e1
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Sep 5 19:42:33 2023 +0800
Fix get inventory position not correctly at Pipeline (#28356)
* Fix get inventory position not correctly
* Change integer to int
---
.../ingest/InventoryDumperConfiguration.java | 2 +-
.../progress/JobItemInventoryTasksProgress.java | 2 +-
.../data/pipeline/core/task/PipelineTaskUtils.java | 2 +-
.../InventoryIncrementalJobItemProgressTest.java | 35 ++++++++++++++++++----
...ntoryIncrementalJobItemProgressSwapperTest.java | 6 ++--
.../core/src/test/resources/job-progress.yaml | 8 ++---
6 files changed, 40 insertions(+), 15 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index b53da97d117..6663e20af4a 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -46,7 +46,7 @@ public final class InventoryDumperConfiguration extends
DumperConfiguration {
private Integer transactionIsolation;
- private Integer shardingItem;
+ private int shardingItem;
private int batchSize = 1000;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/JobItemInventoryTasksProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/JobItemInventoryTasksProgress.java
index cad206aba76..962021ea339 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/JobItemInventoryTasksProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/JobItemInventoryTasksProgress.java
@@ -43,7 +43,7 @@ public final class JobItemInventoryTasksProgress {
* @return inventory position
*/
public Map<String, IngestPosition> getInventoryPosition(final String
tableName) {
- Pattern pattern = Pattern.compile(String.format("%s(#\\d+)?",
tableName));
+ Pattern pattern = Pattern.compile(String.format("\\.%s#(\\d+)?",
tableName));
return progresses.entrySet().stream().filter(entry ->
pattern.matcher(entry.getKey()).find()).collect(Collectors.toMap(Entry::getKey,
entry -> entry.getValue().getPosition()));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
index 8850c511de0..49d29c2405b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -44,7 +44,7 @@ public final class PipelineTaskUtils {
*/
public static String generateInventoryTaskId(final
InventoryDumperConfiguration inventoryDumperConfig) {
String result = String.format("%s.%s",
inventoryDumperConfig.getDataSourceName(),
inventoryDumperConfig.getActualTableName());
- return null == inventoryDumperConfig.getShardingItem() ? result :
result + "#" + inventoryDumperConfig.getShardingItem();
+ return result + "#" + inventoryDumperConfig.getShardingItem();
}
/**
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
index 823b7aa50d6..c19e7a466cf 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
@@ -19,15 +19,20 @@ package
org.apache.shardingsphere.data.pipeline.common.job.progress;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.StringPrimaryKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.UnsupportedKeyPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
+import
org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.test.util.ConfigurationFileUtils;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -60,10 +65,10 @@ class InventoryIncrementalJobItemProgressTest {
@Test
void assertGetInventoryPosition() {
InventoryIncrementalJobItemProgress actual =
getJobItemProgress(ConfigurationFileUtils.readFile("job-progress.yaml"));
- assertThat(actual.getInventory().getInventoryPosition("ds0").size(),
is(2));
-
assertThat(actual.getInventory().getInventoryPosition("ds0").get("ds0.t_1"),
instanceOf(FinishedPosition.class));
-
assertThat(actual.getInventory().getInventoryPosition("ds1").get("ds1.t_1"),
instanceOf(PlaceholderPosition.class));
-
assertThat(actual.getInventory().getInventoryPosition("ds1").get("ds1.t_2"),
instanceOf(IntegerPrimaryKeyPosition.class));
+
assertThat(actual.getInventory().getInventoryPosition("t_1").get("ds0.t_1#1"),
instanceOf(FinishedPosition.class));
+
assertThat(actual.getInventory().getInventoryPosition("t_1").get("ds1.t_1#1"),
instanceOf(PlaceholderPosition.class));
+
assertThat(actual.getInventory().getInventoryPosition("t_2").get("ds0.t_2#2"),
instanceOf(FinishedPosition.class));
+
assertThat(actual.getInventory().getInventoryPosition("t_2").get("ds1.t_2#2"),
instanceOf(IntegerPrimaryKeyPosition.class));
}
@Test
@@ -76,6 +81,26 @@ class InventoryIncrementalJobItemProgressTest {
assertThat(getJobItemProgress(ConfigurationFileUtils.readFile("job-progress-all-finished.yaml")).getIncremental().getIncrementalLatestActiveTimeMillis(),
is(50L));
}
+ @Test
+ void assertGetProgressesCorrectly() {
+ Map<String, InventoryTaskProgress> progresses = new HashMap<>();
+ progresses.put("ds.order_item#0", new InventoryTaskProgress(new
IntegerPrimaryKeyPosition(1, 100)));
+ progresses.put("ds.order_item#1", new InventoryTaskProgress(new
UnsupportedKeyPosition()));
+ progresses.put("ds.order#0", new InventoryTaskProgress(new
FinishedPosition()));
+ progresses.put("ds.test_order#0", new InventoryTaskProgress(new
StringPrimaryKeyPosition("1", "100")));
+ JobItemInventoryTasksProgress progress = new
JobItemInventoryTasksProgress(progresses);
+ Map<String, IngestPosition> orderPosition =
progress.getInventoryPosition("order");
+ assertThat(orderPosition.size(), is(1));
+ assertThat(orderPosition.get("ds.order#0"),
instanceOf(FinishedPosition.class));
+ Map<String, IngestPosition> testOrderPosition =
progress.getInventoryPosition("test_order");
+ assertThat(testOrderPosition.size(), is(1));
+ assertThat(testOrderPosition.get("ds.test_order#0"),
instanceOf(StringPrimaryKeyPosition.class));
+ Map<String, IngestPosition> orderItemPosition =
progress.getInventoryPosition("order_item");
+ assertThat(orderItemPosition.size(), is(2));
+ assertThat(orderItemPosition.get("ds.order_item#0"),
instanceOf(IntegerPrimaryKeyPosition.class));
+ assertThat(orderItemPosition.get("ds.order_item#1"),
instanceOf(UnsupportedKeyPosition.class));
+ }
+
private InventoryIncrementalJobItemProgress getJobItemProgress(final
String data) {
return SWAPPER.swapToObject(YamlEngine.unmarshal(data,
YamlInventoryIncrementalJobItemProgress.class));
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
index 109080d638a..31eac91f4f8 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
@@ -38,10 +38,10 @@ class YamlInventoryIncrementalJobItemProgressSwapperTest {
assertThat(actual.getSourceDatabaseType(), is("H2"));
assertThat(actual.getDataSourceName(), is("ds_0"));
assertThat(actual.getInventory().getFinished().length, is(2));
- assertThat(actual.getInventory().getFinished(), is(new
String[]{"ds0.t_2", "ds0.t_1"}));
+ assertThat(actual.getInventory().getFinished(), is(new
String[]{"ds0.t_2#2", "ds0.t_1#1"}));
assertThat(actual.getInventory().getUnfinished().size(), is(2));
- assertThat(actual.getInventory().getUnfinished().get("ds1.t_2"),
is("i,1,2"));
- assertThat(actual.getInventory().getUnfinished().get("ds1.t_1"),
is(""));
+ assertThat(actual.getInventory().getUnfinished().get("ds1.t_2#2"),
is("i,1,2"));
+ assertThat(actual.getInventory().getUnfinished().get("ds1.t_1#1"),
is(""));
assertThat(actual.getIncremental().getPosition().length(), is(0));
}
diff --git a/kernel/data-pipeline/core/src/test/resources/job-progress.yaml
b/kernel/data-pipeline/core/src/test/resources/job-progress.yaml
index fa82043980c..b9267ec1f2d 100644
--- a/kernel/data-pipeline/core/src/test/resources/job-progress.yaml
+++ b/kernel/data-pipeline/core/src/test/resources/job-progress.yaml
@@ -23,10 +23,10 @@ incremental:
position: ''
inventory:
finished:
- - ds0.t_2
- - ds0.t_1
+ - ds0.t_2#2
+ - ds0.t_1#1
unfinished:
- ds1.t_2: i,1,2
- ds1.t_1: ''
+ ds1.t_1#1: ''
+ ds1.t_2#2: i,1,2
sourceDatabaseType: H2
status: RUNNING