This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a2b92042a0 Refactor InventoryDumperContextSplitter (#32596)
7a2b92042a0 is described below

commit 7a2b92042a0b62acdc66ba3ef30ed4547609484f
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Aug 19 21:21:21 2024 +0800

    Refactor InventoryDumperContextSplitter (#32596)
    
    * Refactor PipelineTaskUtils.generateInventoryTaskId
    
    * Refactor PipelineTaskUtils.generateInventoryTaskId
    
    * Refactor PipelineTaskUtils.generateInventoryTaskId
    
    * Refactor PipelineTaskUtils.generateInventoryTaskId
    
    * Refactor InventoryTask
    
    * Fix javadoc of InventoryDumperContextSplitter
    
    * Refactor InventoryDumperContextSplitter
    
    * Refactor InventoryDumperContextSplitter
---
 .../splitter/InventoryDumperContextSplitter.java   | 84 +++++++++++-----------
 .../inventory/splitter/InventoryTaskSplitter.java  |  2 +-
 .../data/pipeline/core/task/InventoryTask.java     |  2 +-
 .../data/pipeline/core/task/PipelineTaskUtils.java | 11 ++-
 .../pipeline/core/task/PipelineTaskUtilsTest.java  | 36 ++++++++++
 5 files changed, 86 insertions(+), 49 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 4ccdfd82b60..6a68d80d70e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -18,7 +18,6 @@
 package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter;
 
 import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
@@ -31,7 +30,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator;
@@ -39,6 +37,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgori
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
+import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -52,10 +51,9 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 /**
- * Inventory task splitter.
+ * Inventory dumper context splitter.
  */
 @RequiredArgsConstructor
-@Slf4j
 public final class InventoryDumperContextSplitter {
     
     private final PipelineDataSourceWrapper sourceDataSource;
@@ -69,56 +67,46 @@ public final class InventoryDumperContextSplitter {
      * @return inventory dumper contexts
      */
     public Collection<InventoryDumperContext> split(final 
TransmissionJobItemContext jobItemContext) {
-        return splitByTable(dumperContext).stream().flatMap(each -> 
splitByPrimaryKey(each, jobItemContext, 
sourceDataSource).stream()).collect(Collectors.toList());
+        return splitByTable().stream().flatMap(each -> splitByPrimaryKey(each, 
jobItemContext).stream()).collect(Collectors.toList());
     }
     
-    private Collection<InventoryDumperContext> splitByTable(final 
InventoryDumperContext dumperContext) {
-        Collection<InventoryDumperContext> result = new LinkedList<>();
-        
dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().forEach((key,
 value) -> {
-            InventoryDumperContext inventoryDumperContext = new 
InventoryDumperContext(dumperContext.getCommonContext());
-            // use original table name, for metadata loader, since some 
database table name case-sensitive
-            inventoryDumperContext.setActualTableName(key.toString());
-            inventoryDumperContext.setLogicTableName(value.toString());
-            inventoryDumperContext.getCommonContext().setPosition(new 
IngestPlaceholderPosition());
-            
inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
-            
inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
-            result.add(inventoryDumperContext);
-        });
+    private Collection<InventoryDumperContext> splitByTable() {
+        return 
dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().entrySet()
+                .stream().map(entry -> 
createTableSpLitDumperContext(entry.getKey(), 
entry.getValue())).collect(Collectors.toList());
+    }
+    
+    private InventoryDumperContext createTableSpLitDumperContext(final 
CaseInsensitiveIdentifier actualTableName, final CaseInsensitiveIdentifier 
logicTableName) {
+        InventoryDumperContext result = new 
InventoryDumperContext(dumperContext.getCommonContext());
+        // use original table name, for metadata loader, since some database 
table name case-sensitive
+        result.setActualTableName(actualTableName.toString());
+        result.setLogicTableName(logicTableName.toString());
+        result.getCommonContext().setPosition(new IngestPlaceholderPosition());
+        result.setInsertColumnNames(dumperContext.getInsertColumnNames());
+        result.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
         return result;
     }
     
-    private Collection<InventoryDumperContext> splitByPrimaryKey(final 
InventoryDumperContext dumperContext, final TransmissionJobItemContext 
jobItemContext,
-                                                                 final 
PipelineDataSourceWrapper dataSource) {
+    private Collection<InventoryDumperContext> splitByPrimaryKey(final 
InventoryDumperContext dumperContext, final TransmissionJobItemContext 
jobItemContext) {
         if (null == dumperContext.getUniqueKeyColumns()) {
-            String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
-            String actualTableName = dumperContext.getActualTableName();
-            List<PipelineColumnMetaData> uniqueKeyColumns = 
PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, 
jobItemContext.getSourceMetaDataLoader());
-            dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
+            
dumperContext.setUniqueKeyColumns(getTableUniqueKeys(dumperContext, 
jobItemContext));
         }
         Collection<InventoryDumperContext> result = new LinkedList<>();
         TransmissionProcessContext jobProcessContext = 
jobItemContext.getJobProcessContext();
-        PipelineReadConfiguration readConfig = 
jobProcessContext.getProcessConfiguration().getRead();
-        int batchSize = readConfig.getBatchSize();
+        int batchSize = 
jobProcessContext.getProcessConfiguration().getRead().getBatchSize();
         JobRateLimitAlgorithm rateLimitAlgorithm = 
jobProcessContext.getReadRateLimitAlgorithm();
-        Collection<IngestPosition> inventoryPositions = 
getInventoryPositions(dumperContext, jobItemContext, dataSource);
         int i = 0;
-        for (IngestPosition each : inventoryPositions) {
-            InventoryDumperContext splitDumperContext = new 
InventoryDumperContext(dumperContext.getCommonContext());
-            splitDumperContext.getCommonContext().setPosition(each);
-            splitDumperContext.setShardingItem(i++);
-            
splitDumperContext.setActualTableName(dumperContext.getActualTableName());
-            
splitDumperContext.setLogicTableName(dumperContext.getLogicTableName());
-            
splitDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
-            
splitDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
-            splitDumperContext.setBatchSize(batchSize);
-            splitDumperContext.setRateLimitAlgorithm(rateLimitAlgorithm);
-            result.add(splitDumperContext);
+        for (IngestPosition each : getInventoryPositions(dumperContext, 
jobItemContext)) {
+            result.add(createPrimaryKeySplitDumperContext(dumperContext, each, 
i++, batchSize, rateLimitAlgorithm));
         }
         return result;
     }
     
-    private Collection<IngestPosition> getInventoryPositions(final 
InventoryDumperContext dumperContext, final TransmissionJobItemContext 
jobItemContext,
-                                                             final 
PipelineDataSourceWrapper dataSource) {
+    private List<PipelineColumnMetaData> getTableUniqueKeys(final 
InventoryDumperContext dumperContext, final TransmissionJobItemContext 
jobItemContext) {
+        String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+        return PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, 
dumperContext.getActualTableName(), jobItemContext.getSourceMetaDataLoader());
+    }
+    
+    private Collection<IngestPosition> getInventoryPositions(final 
InventoryDumperContext dumperContext, final TransmissionJobItemContext 
jobItemContext) {
         TransmissionJobItemProgress initProgress = 
jobItemContext.getInitProgress();
         if (null != initProgress) {
             // Do NOT filter FinishedPosition here, since whole inventory 
tasks are required in job progress when persisting to register center.
@@ -127,7 +115,7 @@ public final class InventoryDumperContextSplitter {
                 return result;
             }
         }
-        long tableRecordsCount = 
InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, dataSource);
+        long tableRecordsCount = 
InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, 
sourceDataSource);
         jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
         if (!dumperContext.hasUniqueKey()) {
             return Collections.singleton(new UnsupportedKeyIngestPosition());
@@ -136,7 +124,7 @@ public final class InventoryDumperContextSplitter {
         if (1 == uniqueKeyColumns.size()) {
             int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
             if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
-                return getPositionByIntegerUniqueKeyRange(dumperContext, 
tableRecordsCount, jobItemContext, dataSource);
+                return getPositionByIntegerUniqueKeyRange(dumperContext, 
tableRecordsCount, jobItemContext, sourceDataSource);
             }
             if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
                 // TODO Support string unique key table splitting. Ascii 
characters ordering are different in different versions of databases.
@@ -179,4 +167,18 @@ public final class InventoryDumperContextSplitter {
             throw new 
SplitPipelineJobByUniqueKeyException(dumperContext.getActualTableName(), 
uniqueKey, ex);
         }
     }
+    
+    private InventoryDumperContext createPrimaryKeySplitDumperContext(final 
InventoryDumperContext dumperContext, final IngestPosition position,
+                                                                      final 
int shardingItem, final int batchSize, final JobRateLimitAlgorithm 
rateLimitAlgorithm) {
+        InventoryDumperContext result = new 
InventoryDumperContext(dumperContext.getCommonContext());
+        result.getCommonContext().setPosition(position);
+        result.setShardingItem(shardingItem);
+        result.setActualTableName(dumperContext.getActualTableName());
+        result.setLogicTableName(dumperContext.getLogicTableName());
+        result.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
+        result.setInsertColumnNames(dumperContext.getInsertColumnNames());
+        result.setBatchSize(batchSize);
+        result.setRateLimitAlgorithm(rateLimitAlgorithm);
+        return result;
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
index e3db30855f4..034081e5203 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
@@ -70,7 +70,7 @@ public final class InventoryTaskSplitter {
             result.add(new 
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
                     processContext.getInventoryDumperExecuteEngine(), 
processContext.getInventoryImporterExecuteEngine(), dumper, importer, 
position));
         }
-        log.info("splitInventoryData cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
+        log.info("Split inventory tasks cost {} ms", 
System.currentTimeMillis() - startTimeMillis);
         return result;
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 19d88aea7a9..afe20fc1e3d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * Inventory task.
  */
 @RequiredArgsConstructor
-@ToString(exclude = {"inventoryDumperExecuteEngine", 
"inventoryImporterExecuteEngine", "dumper", "importer"})
+@ToString(of = {"taskId", "position"})
 public final class InventoryTask implements PipelineTask {
     
     @Getter
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
index a29460f6fef..a4ae090d786 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -33,14 +33,13 @@ import java.util.Optional;
 public final class PipelineTaskUtils {
     
     /**
-     * Generate inventory task id.
+     * Generate inventory task ID.
      *
-     * @param inventoryDumperContext inventory dumper context
-     * @return inventory task id
+     * @param dumperContext inventory dumper context
+     * @return generated ID
      */
-    public static String generateInventoryTaskId(final InventoryDumperContext 
inventoryDumperContext) {
-        String result = String.format("%s.%s", 
inventoryDumperContext.getCommonContext().getDataSourceName(), 
inventoryDumperContext.getActualTableName());
-        return result + "#" + inventoryDumperContext.getShardingItem();
+    public static String generateInventoryTaskId(final InventoryDumperContext 
dumperContext) {
+        return String.format("%s.%s#%s", 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName(), dumperContext.getShardingItem());
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java
new file mode 100644
index 00000000000..903e99ec83a
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.task;
+
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class PipelineTaskUtilsTest {
+    
+    @Test
+    void assertGenerateInventoryTaskId() {
+        InventoryDumperContext dumperContext = new InventoryDumperContext(new 
DumperCommonContext("foo_ds", null, null, null));
+        dumperContext.setActualTableName("foo_actual_tbl");
+        dumperContext.setShardingItem(1);
+        assertThat(PipelineTaskUtils.generateInventoryTaskId(dumperContext), 
is("foo_ds.foo_actual_tbl#1"));
+    }
+}

Reply via email to