This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a18ad2b1ae Extract 
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange (#37519)
8a18ad2b1ae is described below

commit 8a18ad2b1ae3709d3e682085f57001c4d5db65f7
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Dec 25 19:57:19 2025 +0800

    Extract InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange 
(#37519)
---
 .../InventoryPositionEstimatedCalculator.java      | 31 ++++++++++++++++++++++
 .../splitter/InventoryDumperContextSplitter.java   | 26 +++---------------
 2 files changed, 34 insertions(+), 23 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
index 1fd5afec071..322711990cd 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
@@ -20,11 +20,18 @@ package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculat
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.Range;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
 
 import java.math.BigInteger;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -35,6 +42,30 @@ import java.util.List;
 @NoArgsConstructor(access = AccessLevel.NONE)
 public final class InventoryPositionEstimatedCalculator {
     
+    /**
+     * Get integer unique key values range.
+     *
+     * @param dataSource data source
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param uniqueKey unique key
+     * @return unique key values range
+     * @throws SplitPipelineJobByUniqueKeyException if an error occurs while 
getting unique key values range
+     */
+    public static Range<Long> getIntegerUniqueKeyValuesRange(final 
PipelineDataSource dataSource, final String schemaName, final String tableName, 
final String uniqueKey) {
+        PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
+        String sql = 
pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(schemaName, tableName, 
uniqueKey);
+        try (
+                Connection connection = dataSource.getConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
+            resultSet.next();
+            return Range.of(resultSet.getLong(1), resultSet.getLong(2));
+        } catch (final SQLException ex) {
+            throw new SplitPipelineJobByUniqueKeyException(tableName, 
uniqueKey, ex);
+        }
+    }
+    
     /**
      * Get positions by integer unique key range.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 32da7a3c7dd..b8c262355af 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -22,7 +22,6 @@ import org.apache.commons.lang3.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
@@ -34,15 +33,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColum
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.estimated.InventoryPositionEstimatedCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
 import 
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
 import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -122,7 +116,9 @@ public final class InventoryDumperContextSplitter {
         List<PipelineColumnMetaData> uniqueKeyColumns = 
dumperContext.getUniqueKeyColumns();
         int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
         if (dataTypeOption.isIntegerDataType(firstColumnDataType)) {
-            Range<Long> uniqueKeyValuesRange = 
getUniqueKeyValuesRange(jobItemContext, dumperContext);
+            String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+            String uniqueKey = 
dumperContext.getUniqueKeyColumns().get(0).getName();
+            Range<Long> uniqueKeyValuesRange = 
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(sourceDataSource,
 schemaName, dumperContext.getActualTableName(), uniqueKey);
             int shardingSize = 
jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
             return 
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount, 
uniqueKeyValuesRange, shardingSize);
         }
@@ -132,22 +128,6 @@ public final class InventoryDumperContextSplitter {
         return Collections.singleton(new UnsupportedKeyIngestPosition());
     }
     
-    private Range<Long> getUniqueKeyValuesRange(final 
TransmissionJobItemContext jobItemContext, final InventoryDumperContext 
dumperContext) {
-        String uniqueKey = 
dumperContext.getUniqueKeyColumns().get(0).getName();
-        PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
-        String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
-                
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
 dumperContext.getActualTableName(), uniqueKey);
-        try (
-                Connection connection = sourceDataSource.getConnection();
-                Statement statement = connection.createStatement();
-                ResultSet resultSet = statement.executeQuery(sql)) {
-            resultSet.next();
-            return Range.of(resultSet.getLong(1), resultSet.getLong(2));
-        } catch (final SQLException ex) {
-            throw new 
SplitPipelineJobByUniqueKeyException(dumperContext.getActualTableName(), 
uniqueKey, ex);
-        }
-    }
-    
     private InventoryDumperContext createPrimaryKeySplitDumperContext(final 
InventoryDumperContext dumperContext, final IngestPosition position,
                                                                       final 
int shardingItem, final int batchSize, final JobRateLimitAlgorithm 
rateLimitAlgorithm) {
         InventoryDumperContext result = new 
InventoryDumperContext(dumperContext.getCommonContext());

Reply via email to