This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e3f259a8b4 Fix get inventory position not correctly at Pipeline 
(#28356)
5e3f259a8b4 is described below

commit 5e3f259a8b49d9fa57aaedf871a7f7d7fd62d7e1
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Sep 5 19:42:33 2023 +0800

    Fix get inventory position not correctly at Pipeline (#28356)
    
    * Fix get inventory position not correctly
    
    * Change integer to int
---
 .../ingest/InventoryDumperConfiguration.java       |  2 +-
 .../progress/JobItemInventoryTasksProgress.java    |  2 +-
 .../data/pipeline/core/task/PipelineTaskUtils.java |  2 +-
 .../InventoryIncrementalJobItemProgressTest.java   | 35 ++++++++++++++++++----
 ...ntoryIncrementalJobItemProgressSwapperTest.java |  6 ++--
 .../core/src/test/resources/job-progress.yaml      |  8 ++---
 6 files changed, 40 insertions(+), 15 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index b53da97d117..6663e20af4a 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -46,7 +46,7 @@ public final class InventoryDumperConfiguration extends 
DumperConfiguration {
     
     private Integer transactionIsolation;
     
-    private Integer shardingItem;
+    private int shardingItem;
     
     private int batchSize = 1000;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/JobItemInventoryTasksProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/JobItemInventoryTasksProgress.java
index cad206aba76..962021ea339 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/JobItemInventoryTasksProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/JobItemInventoryTasksProgress.java
@@ -43,7 +43,7 @@ public final class JobItemInventoryTasksProgress {
      * @return inventory position
      */
     public Map<String, IngestPosition> getInventoryPosition(final String 
tableName) {
-        Pattern pattern = Pattern.compile(String.format("%s(#\\d+)?", 
tableName));
+        Pattern pattern = Pattern.compile(String.format("\\.%s#(\\d+)?", 
tableName));
         return progresses.entrySet().stream().filter(entry -> 
pattern.matcher(entry.getKey()).find()).collect(Collectors.toMap(Entry::getKey, 
entry -> entry.getValue().getPosition()));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
index 8850c511de0..49d29c2405b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -44,7 +44,7 @@ public final class PipelineTaskUtils {
      */
     public static String generateInventoryTaskId(final 
InventoryDumperConfiguration inventoryDumperConfig) {
         String result = String.format("%s.%s", 
inventoryDumperConfig.getDataSourceName(), 
inventoryDumperConfig.getActualTableName());
-        return null == inventoryDumperConfig.getShardingItem() ? result : 
result + "#" + inventoryDumperConfig.getShardingItem();
+        return result + "#" + inventoryDumperConfig.getShardingItem();
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
index 823b7aa50d6..c19e7a466cf 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/InventoryIncrementalJobItemProgressTest.java
@@ -19,15 +19,20 @@ package 
org.apache.shardingsphere.data.pipeline.common.job.progress;
 
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
-import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
+import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
+import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.StringPrimaryKeyPosition;
+import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.UnsupportedKeyPosition;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
+import 
org.apache.shardingsphere.data.pipeline.common.task.progress.InventoryTaskProgress;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.test.util.ConfigurationFileUtils;
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -60,10 +65,10 @@ class InventoryIncrementalJobItemProgressTest {
     @Test
     void assertGetInventoryPosition() {
         InventoryIncrementalJobItemProgress actual = 
getJobItemProgress(ConfigurationFileUtils.readFile("job-progress.yaml"));
-        assertThat(actual.getInventory().getInventoryPosition("ds0").size(), 
is(2));
-        
assertThat(actual.getInventory().getInventoryPosition("ds0").get("ds0.t_1"), 
instanceOf(FinishedPosition.class));
-        
assertThat(actual.getInventory().getInventoryPosition("ds1").get("ds1.t_1"), 
instanceOf(PlaceholderPosition.class));
-        
assertThat(actual.getInventory().getInventoryPosition("ds1").get("ds1.t_2"), 
instanceOf(IntegerPrimaryKeyPosition.class));
+        
assertThat(actual.getInventory().getInventoryPosition("t_1").get("ds0.t_1#1"), 
instanceOf(FinishedPosition.class));
+        
assertThat(actual.getInventory().getInventoryPosition("t_1").get("ds1.t_1#1"), 
instanceOf(PlaceholderPosition.class));
+        
assertThat(actual.getInventory().getInventoryPosition("t_2").get("ds0.t_2#2"), 
instanceOf(FinishedPosition.class));
+        
assertThat(actual.getInventory().getInventoryPosition("t_2").get("ds1.t_2#2"), 
instanceOf(IntegerPrimaryKeyPosition.class));
     }
     
     @Test
@@ -76,6 +81,26 @@ class InventoryIncrementalJobItemProgressTest {
         
assertThat(getJobItemProgress(ConfigurationFileUtils.readFile("job-progress-all-finished.yaml")).getIncremental().getIncrementalLatestActiveTimeMillis(),
 is(50L));
     }
     
+    @Test
+    void assertGetProgressesCorrectly() {
+        Map<String, InventoryTaskProgress> progresses = new HashMap<>();
+        progresses.put("ds.order_item#0", new InventoryTaskProgress(new 
IntegerPrimaryKeyPosition(1, 100)));
+        progresses.put("ds.order_item#1", new InventoryTaskProgress(new 
UnsupportedKeyPosition()));
+        progresses.put("ds.order#0", new InventoryTaskProgress(new 
FinishedPosition()));
+        progresses.put("ds.test_order#0", new InventoryTaskProgress(new 
StringPrimaryKeyPosition("1", "100")));
+        JobItemInventoryTasksProgress progress = new 
JobItemInventoryTasksProgress(progresses);
+        Map<String, IngestPosition> orderPosition = 
progress.getInventoryPosition("order");
+        assertThat(orderPosition.size(), is(1));
+        assertThat(orderPosition.get("ds.order#0"), 
instanceOf(FinishedPosition.class));
+        Map<String, IngestPosition> testOrderPosition = 
progress.getInventoryPosition("test_order");
+        assertThat(testOrderPosition.size(), is(1));
+        assertThat(testOrderPosition.get("ds.test_order#0"), 
instanceOf(StringPrimaryKeyPosition.class));
+        Map<String, IngestPosition> orderItemPosition = 
progress.getInventoryPosition("order_item");
+        assertThat(orderItemPosition.size(), is(2));
+        assertThat(orderItemPosition.get("ds.order_item#0"), 
instanceOf(IntegerPrimaryKeyPosition.class));
+        assertThat(orderItemPosition.get("ds.order_item#1"), 
instanceOf(UnsupportedKeyPosition.class));
+    }
+    
     private InventoryIncrementalJobItemProgress getJobItemProgress(final 
String data) {
         return SWAPPER.swapToObject(YamlEngine.unmarshal(data, 
YamlInventoryIncrementalJobItemProgress.class));
     }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
index 109080d638a..31eac91f4f8 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
@@ -38,10 +38,10 @@ class YamlInventoryIncrementalJobItemProgressSwapperTest {
         assertThat(actual.getSourceDatabaseType(), is("H2"));
         assertThat(actual.getDataSourceName(), is("ds_0"));
         assertThat(actual.getInventory().getFinished().length, is(2));
-        assertThat(actual.getInventory().getFinished(), is(new 
String[]{"ds0.t_2", "ds0.t_1"}));
+        assertThat(actual.getInventory().getFinished(), is(new 
String[]{"ds0.t_2#2", "ds0.t_1#1"}));
         assertThat(actual.getInventory().getUnfinished().size(), is(2));
-        assertThat(actual.getInventory().getUnfinished().get("ds1.t_2"), 
is("i,1,2"));
-        assertThat(actual.getInventory().getUnfinished().get("ds1.t_1"), 
is(""));
+        assertThat(actual.getInventory().getUnfinished().get("ds1.t_2#2"), 
is("i,1,2"));
+        assertThat(actual.getInventory().getUnfinished().get("ds1.t_1#1"), 
is(""));
         assertThat(actual.getIncremental().getPosition().length(), is(0));
     }
     
diff --git a/kernel/data-pipeline/core/src/test/resources/job-progress.yaml 
b/kernel/data-pipeline/core/src/test/resources/job-progress.yaml
index fa82043980c..b9267ec1f2d 100644
--- a/kernel/data-pipeline/core/src/test/resources/job-progress.yaml
+++ b/kernel/data-pipeline/core/src/test/resources/job-progress.yaml
@@ -23,10 +23,10 @@ incremental:
   position: ''
 inventory:
   finished:
-  - ds0.t_2
-  - ds0.t_1
+  - ds0.t_2#2
+  - ds0.t_1#1
   unfinished:
-    ds1.t_2: i,1,2
-    ds1.t_1: ''
+    ds1.t_1#1: ''
+    ds1.t_2#2: i,1,2
 sourceDatabaseType: H2
 status: RUNNING

Reply via email to