This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8a18ad2b1ae Extract
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange (#37519)
8a18ad2b1ae is described below
commit 8a18ad2b1ae3709d3e682085f57001c4d5db65f7
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Dec 25 19:57:19 2025 +0800
Extract InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange
(#37519)
---
.../InventoryPositionEstimatedCalculator.java | 31 ++++++++++++++++++++++
.../splitter/InventoryDumperContextSplitter.java | 26 +++---------------
2 files changed, 34 insertions(+), 23 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
index 1fd5afec071..322711990cd 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
@@ -20,11 +20,18 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculat
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.Range;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
import java.math.BigInteger;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -35,6 +42,30 @@ import java.util.List;
@NoArgsConstructor(access = AccessLevel.NONE)
public final class InventoryPositionEstimatedCalculator {
+ /**
+ * Get integer unique key values range.
+ *
+ * @param dataSource data source
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param uniqueKey unique key
+ * @return unique key values range
+ * @throws SplitPipelineJobByUniqueKeyException if an error occurs while
getting unique key values range
+ */
+ public static Range<Long> getIntegerUniqueKeyValuesRange(final
PipelineDataSource dataSource, final String schemaName, final String tableName,
final String uniqueKey) {
+ PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
+ String sql =
pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(schemaName, tableName,
uniqueKey);
+ try (
+ Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
+ resultSet.next();
+ return Range.of(resultSet.getLong(1), resultSet.getLong(2));
+ } catch (final SQLException ex) {
+ throw new SplitPipelineJobByUniqueKeyException(tableName,
uniqueKey, ex);
+ }
+ }
+
/**
* Get positions by integer unique key range.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 32da7a3c7dd..b8c262355af 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -22,7 +22,6 @@ import org.apache.commons.lang3.Range;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
@@ -34,15 +33,10 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColum
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.estimated.InventoryPositionEstimatedCalculator;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
@@ -122,7 +116,9 @@ public final class InventoryDumperContextSplitter {
List<PipelineColumnMetaData> uniqueKeyColumns =
dumperContext.getUniqueKeyColumns();
int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
if (dataTypeOption.isIntegerDataType(firstColumnDataType)) {
- Range<Long> uniqueKeyValuesRange =
getUniqueKeyValuesRange(jobItemContext, dumperContext);
+ String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+ String uniqueKey =
dumperContext.getUniqueKeyColumns().get(0).getName();
+ Range<Long> uniqueKeyValuesRange =
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(sourceDataSource,
schemaName, dumperContext.getActualTableName(), uniqueKey);
int shardingSize =
jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
return
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount,
uniqueKeyValuesRange, shardingSize);
}
@@ -132,22 +128,6 @@ public final class InventoryDumperContextSplitter {
return Collections.singleton(new UnsupportedKeyIngestPosition());
}
- private Range<Long> getUniqueKeyValuesRange(final
TransmissionJobItemContext jobItemContext, final InventoryDumperContext
dumperContext) {
- String uniqueKey =
dumperContext.getUniqueKeyColumns().get(0).getName();
- PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
- String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
-
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
dumperContext.getActualTableName(), uniqueKey);
- try (
- Connection connection = sourceDataSource.getConnection();
- Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery(sql)) {
- resultSet.next();
- return Range.of(resultSet.getLong(1), resultSet.getLong(2));
- } catch (final SQLException ex) {
- throw new
SplitPipelineJobByUniqueKeyException(dumperContext.getActualTableName(),
uniqueKey, ex);
- }
- }
-
private InventoryDumperContext createPrimaryKeySplitDumperContext(final
InventoryDumperContext dumperContext, final IngestPosition position,
final
int shardingItem, final int batchSize, final JobRateLimitAlgorithm
rateLimitAlgorithm) {
InventoryDumperContext result = new
InventoryDumperContext(dumperContext.getCommonContext());