This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7345274fb6d Update pipeline code style (#37526)
7345274fb6d is described below

commit 7345274fb6d6c8f548f8b9682a0a1202c4342e57
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Dec 26 10:54:19 2025 +0800

    Update pipeline code style (#37526)
    
    * Rename and param ordering
    
    * Use QualifiedTable
    
    * Rename
---
 .../position/InventoryPositionCalculator.java        | 11 +++++------
 .../InventoryPositionEstimatedCalculator.java        | 12 ++++++------
 .../exact/InventoryPositionExactCalculator.java      | 20 ++++++++++----------
 .../splitter/InventoryDumperContextSplitter.java     |  7 ++++---
 .../inventory/splitter/InventoryTaskSplitter.java    |  6 +++---
 5 files changed, 28 insertions(+), 28 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
index 3a64f704d4e..b1bbd93484e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColum
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.estimated.InventoryPositionEstimatedCalculator;
 import 
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
 import 
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 
 import java.util.Collections;
 import java.util.List;
@@ -37,11 +38,9 @@ import java.util.List;
 @RequiredArgsConstructor
 public final class InventoryPositionCalculator {
     
-    private final PipelineDataSource sourceDataSource;
+    private final PipelineDataSource dataSource;
     
-    private final String schemaName;
-    
-    private final String tableName;
+    private final QualifiedTable qualifiedTable;
     
     private final List<PipelineColumnMetaData> uniqueKeyColumns;
     
@@ -55,11 +54,11 @@ public final class InventoryPositionCalculator {
      * @return positions
      */
     public List<IngestPosition> getPositions() {
-        DialectDataTypeOption dataTypeOption = new 
DatabaseTypeRegistry(sourceDataSource.getDatabaseType()).getDialectDatabaseMetaData().getDataTypeOption();
+        DialectDataTypeOption dataTypeOption = new 
DatabaseTypeRegistry(dataSource.getDatabaseType()).getDialectDatabaseMetaData().getDataTypeOption();
         int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
         if (dataTypeOption.isIntegerDataType(firstColumnDataType)) {
             String uniqueKey = uniqueKeyColumns.get(0).getName();
-            QueryRange uniqueKeyValuesRange = 
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(sourceDataSource,
 schemaName, tableName, uniqueKey);
+            QueryRange uniqueKeyValuesRange = 
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(qualifiedTable,
 uniqueKey, dataSource);
             return 
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount, 
uniqueKeyValuesRange, shardingSize);
         }
         if (1 == uniqueKeyColumns.size() && 
dataTypeOption.isStringDataType(firstColumnDataType)) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
index 9216a35a9f1..aec8ef6ad2d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPositi
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 
 import java.math.BigInteger;
 import java.sql.Connection;
@@ -46,16 +47,15 @@ public final class InventoryPositionEstimatedCalculator {
     /**
      * Get integer unique key values range.
      *
-     * @param dataSource data source
-     * @param schemaName schema name
-     * @param tableName table name
+     * @param qualifiedTable qualified table
      * @param uniqueKey unique key
+     * @param dataSource data source
      * @return unique key values range
      * @throws SplitPipelineJobByUniqueKeyException if an error occurs while 
getting unique key values range
      */
-    public static QueryRange getIntegerUniqueKeyValuesRange(final 
PipelineDataSource dataSource, final String schemaName, final String tableName, 
final String uniqueKey) {
+    public static QueryRange getIntegerUniqueKeyValuesRange(final 
QualifiedTable qualifiedTable, final String uniqueKey, final PipelineDataSource 
dataSource) {
         PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
-        String sql = 
pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(schemaName, tableName, 
uniqueKey);
+        String sql = 
pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(qualifiedTable.getSchemaName(),
 qualifiedTable.getTableName(), uniqueKey);
         try (
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement();
@@ -63,7 +63,7 @@ public final class InventoryPositionEstimatedCalculator {
             resultSet.next();
             return QueryRange.closed(resultSet.getLong(1), 
resultSet.getLong(2));
         } catch (final SQLException ex) {
-            throw new SplitPipelineJobByUniqueKeyException(tableName, 
uniqueKey, ex);
+            throw new 
SplitPipelineJobByUniqueKeyException(qualifiedTable.getTableName(), uniqueKey, 
ex);
         }
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
index 0f8712ef837..dd8d3f2a40b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
@@ -48,26 +48,26 @@ public final class InventoryPositionExactCalculator {
      * @param qualifiedTable qualified table
      * @param uniqueKey unique key
      * @param shardingSize sharding size
-     * @param sourceDataSource source data source
+     * @param dataSource data source
      * @param positionHandler position handler
      * @return positions
      * @throws SplitPipelineJobByUniqueKeyException if an error occurs while 
splitting table by unique key
      */
     public static <T> List<IngestPosition> getPositions(final QualifiedTable 
qualifiedTable, final String uniqueKey, final int shardingSize,
-                                                        final 
PipelineDataSource sourceDataSource, final DataTypePositionHandler<T> 
positionHandler) {
+                                                        final 
PipelineDataSource dataSource, final DataTypePositionHandler<T> 
positionHandler) {
         List<IngestPosition> result = new LinkedList<>();
-        PrimaryKeyIngestPosition<T> firstPosition = 
getFirstPosition(qualifiedTable, uniqueKey, shardingSize, sourceDataSource, 
positionHandler);
+        PrimaryKeyIngestPosition<T> firstPosition = 
getFirstPosition(qualifiedTable, uniqueKey, shardingSize, dataSource, 
positionHandler);
         result.add(firstPosition);
-        result.addAll(getLeftPositions(qualifiedTable, uniqueKey, 
shardingSize, firstPosition, sourceDataSource, positionHandler));
+        result.addAll(getLeftPositions(qualifiedTable, uniqueKey, 
shardingSize, firstPosition, dataSource, positionHandler));
         return result;
     }
     
     private static <T> PrimaryKeyIngestPosition<T> getFirstPosition(final 
QualifiedTable qualifiedTable, final String uniqueKey, final int shardingSize,
-                                                                    final 
PipelineDataSource sourceDataSource, final DataTypePositionHandler<T> 
positionHandler) {
-        String firstQuerySQL = new 
PipelinePrepareSQLBuilder(sourceDataSource.getDatabaseType())
+                                                                    final 
PipelineDataSource dataSource, final DataTypePositionHandler<T> 
positionHandler) {
+        String firstQuerySQL = new 
PipelinePrepareSQLBuilder(dataSource.getDatabaseType())
                 
.buildSplitByUniqueKeyRangedSQL(qualifiedTable.getSchemaName(), 
qualifiedTable.getTableName(), uniqueKey, false);
         try (
-                Connection connection = sourceDataSource.getConnection();
+                Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(firstQuerySQL)) {
             preparedStatement.setLong(1, shardingSize);
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
@@ -91,14 +91,14 @@ public final class InventoryPositionExactCalculator {
     
     private static <T> List<IngestPosition> getLeftPositions(final 
QualifiedTable qualifiedTable, final String uniqueKey,
                                                              final int 
shardingSize, final PrimaryKeyIngestPosition<T> firstPosition,
-                                                             final 
PipelineDataSource sourceDataSource, final DataTypePositionHandler<T> 
positionHandler) {
+                                                             final 
PipelineDataSource dataSource, final DataTypePositionHandler<T> 
positionHandler) {
         List<IngestPosition> result = new LinkedList<>();
         T lowerValue = firstPosition.getEndValue();
         long recordsCount = 0;
-        String laterQuerySQL = new 
PipelinePrepareSQLBuilder(sourceDataSource.getDatabaseType())
+        String laterQuerySQL = new 
PipelinePrepareSQLBuilder(dataSource.getDatabaseType())
                 
.buildSplitByUniqueKeyRangedSQL(qualifiedTable.getSchemaName(), 
qualifiedTable.getTableName(), uniqueKey, true);
         try (
-                Connection connection = sourceDataSource.getConnection();
+                Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(laterQuerySQL)) {
             for (int i = 0; i < Integer.MAX_VALUE; i++) {
                 positionHandler.setPreparedStatementValue(preparedStatement, 
1, lowerValue);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 1c377a279db..34fae406389 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -31,6 +31,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColum
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.InventoryPositionCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.util.Collection;
@@ -45,7 +46,7 @@ import java.util.stream.Collectors;
 @RequiredArgsConstructor
 public final class InventoryDumperContextSplitter {
     
-    private final PipelineDataSource sourceDataSource;
+    private final PipelineDataSource dataSource;
     
     private final InventoryDumperContext dumperContext;
     
@@ -103,14 +104,14 @@ public final class InventoryDumperContextSplitter {
                 return result;
             }
         }
-        long tableRecordsCount = 
InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, 
sourceDataSource);
+        long tableRecordsCount = 
InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, dataSource);
         jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
         if (!dumperContext.hasUniqueKey()) {
             return Collections.singleton(new UnsupportedKeyIngestPosition());
         }
         String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
         int shardingSize = 
jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
-        return new InventoryPositionCalculator(sourceDataSource, schemaName, 
dumperContext.getActualTableName(),
+        return new InventoryPositionCalculator(dataSource, new 
QualifiedTable(schemaName, dumperContext.getActualTableName()),
                 dumperContext.getUniqueKeyColumns(), tableRecordsCount, 
shardingSize).getPositions();
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
index 7716dfd5f52..d7e76f8fd8d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
@@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicReference;
 @Slf4j
 public final class InventoryTaskSplitter {
     
-    private final PipelineDataSource sourceDataSource;
+    private final PipelineDataSource dataSource;
     
     private final InventoryDumperContext dumperContext;
     
@@ -64,12 +64,12 @@ public final class InventoryTaskSplitter {
         List<InventoryTask> result = new LinkedList<>();
         long startTimeMillis = System.currentTimeMillis();
         TransmissionProcessContext processContext = 
jobItemContext.getJobProcessContext();
-        InventoryDumperContextSplitter dumperContextSplitter = new 
InventoryDumperContextSplitter(sourceDataSource, dumperContext);
+        InventoryDumperContextSplitter dumperContextSplitter = new 
InventoryDumperContextSplitter(dataSource, dumperContext);
         for (InventoryDumperContext each : 
dumperContextSplitter.split(jobItemContext)) {
             AtomicReference<IngestPosition> position = new 
AtomicReference<>(each.getCommonContext().getPosition());
             PipelineChannel channel = 
InventoryChannelCreator.create(processContext.getProcessConfiguration().getStreamChannel(),
 importerConfig.getBatchSize(), position);
             InventoryDataRecordPositionCreator positionCreator = 
each.hasUniqueKey() ? new UniqueKeyInventoryDataRecordPositionCreator() : new 
PlaceholderInventoryDataRecordPositionCreator();
-            Dumper dumper = new InventoryDumper(each, channel, 
sourceDataSource, positionCreator);
+            Dumper dumper = new InventoryDumper(each, channel, dataSource, 
positionCreator);
             Importer importer = new SingleChannelConsumerImporter(channel, 
importerConfig.getBatchSize(), 3000L, jobItemContext.getSink(), jobItemContext);
             result.add(new 
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
                     processContext.getInventoryDumperExecuteEngine(), 
processContext.getInventoryImporterExecuteEngine(), dumper, importer, 
position));

Reply via email to