This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 814337927a1 Extract InventoryPositionCalculator (#37520)
814337927a1 is described below

commit 814337927a117817481536087f5d2a86e73d1d34
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Dec 25 20:26:39 2025 +0800

    Extract InventoryPositionCalculator (#37520)
---
 .../position/InventoryPositionCalculator.java      | 70 ++++++++++++++++++++++
 .../splitter/InventoryDumperContextSplitter.java   | 28 +++------
 2 files changed, 77 insertions(+), 21 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
new file mode 100644
index 00000000000..664842a6e33
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.Range;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.estimated.InventoryPositionEstimatedCalculator;
+import 
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
+import 
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Inventory position calculator.
+ */
+@RequiredArgsConstructor
+public final class InventoryPositionCalculator {
+    
+    private final PipelineDataSource sourceDataSource;
+    
+    private final String schemaName;
+    
+    private final String tableName;
+    
+    private final List<PipelineColumnMetaData> uniqueKeyColumns;
+    
+    private final long tableRecordsCount;
+    
+    private final int shardingSize;
+    
+    /**
+     * Get positions.
+     *
+     * @return positions
+     */
+    public List<IngestPosition> getPositions() {
+        DialectDataTypeOption dataTypeOption = new 
DatabaseTypeRegistry(sourceDataSource.getDatabaseType()).getDialectDatabaseMetaData().getDataTypeOption();
+        int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
+        if (dataTypeOption.isIntegerDataType(firstColumnDataType)) {
+            String uniqueKey = uniqueKeyColumns.get(0).getName();
+            Range<Long> uniqueKeyValuesRange = 
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(sourceDataSource,
 schemaName, tableName, uniqueKey);
+            return 
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount, 
uniqueKeyValuesRange, shardingSize);
+        }
+        if (1 == uniqueKeyColumns.size() && 
dataTypeOption.isStringDataType(firstColumnDataType)) {
+            return Collections.singletonList(new 
StringPrimaryKeyIngestPosition(null, null));
+        }
+        return Collections.singletonList(new UnsupportedKeyIngestPosition());
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index b8c262355af..1c377a279db 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -18,23 +18,19 @@
 package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.commons.lang3.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.estimated.InventoryPositionEstimatedCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.InventoryPositionCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
-import 
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
 import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.util.Collection;
@@ -87,7 +83,7 @@ public final class InventoryDumperContextSplitter {
         int batchSize = 
jobProcessContext.getProcessConfiguration().getRead().getBatchSize();
         JobRateLimitAlgorithm rateLimitAlgorithm = 
jobProcessContext.getReadRateLimitAlgorithm();
         int i = 0;
-        for (IngestPosition each : getInventoryPositions(dumperContext, 
jobItemContext)) {
+        for (IngestPosition each : getPositions(dumperContext, 
jobItemContext)) {
             result.add(createPrimaryKeySplitDumperContext(dumperContext, each, 
i++, batchSize, rateLimitAlgorithm));
         }
         return result;
@@ -98,7 +94,7 @@ public final class InventoryDumperContextSplitter {
         return PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, 
dumperContext.getActualTableName(), jobItemContext.getSourceMetaDataLoader());
     }
     
-    private Collection<IngestPosition> getInventoryPositions(final 
InventoryDumperContext dumperContext, final TransmissionJobItemContext 
jobItemContext) {
+    private Collection<IngestPosition> getPositions(final 
InventoryDumperContext dumperContext, final TransmissionJobItemContext 
jobItemContext) {
         TransmissionJobItemProgress initProgress = 
jobItemContext.getInitProgress();
         if (null != initProgress) {
             // Do NOT filter FinishedPosition here, since whole inventory 
tasks are required in job progress when persisting to register center.
@@ -112,20 +108,10 @@ public final class InventoryDumperContextSplitter {
         if (!dumperContext.hasUniqueKey()) {
             return Collections.singleton(new UnsupportedKeyIngestPosition());
         }
-        DialectDataTypeOption dataTypeOption = new 
DatabaseTypeRegistry(sourceDataSource.getDatabaseType()).getDialectDatabaseMetaData().getDataTypeOption();
-        List<PipelineColumnMetaData> uniqueKeyColumns = 
dumperContext.getUniqueKeyColumns();
-        int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
-        if (dataTypeOption.isIntegerDataType(firstColumnDataType)) {
-            String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
-            String uniqueKey = 
dumperContext.getUniqueKeyColumns().get(0).getName();
-            Range<Long> uniqueKeyValuesRange = 
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(sourceDataSource,
 schemaName, dumperContext.getActualTableName(), uniqueKey);
-            int shardingSize = 
jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
-            return 
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount, 
uniqueKeyValuesRange, shardingSize);
-        }
-        if (1 == uniqueKeyColumns.size() && 
dataTypeOption.isStringDataType(firstColumnDataType)) {
-            return Collections.singleton(new 
StringPrimaryKeyIngestPosition(null, null));
-        }
-        return Collections.singleton(new UnsupportedKeyIngestPosition());
+        String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+        int shardingSize = 
jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
+        return new InventoryPositionCalculator(sourceDataSource, schemaName, 
dumperContext.getActualTableName(),
+                dumperContext.getUniqueKeyColumns(), tableRecordsCount, 
shardingSize).getPositions();
     }
     
     private InventoryDumperContext createPrimaryKeySplitDumperContext(final 
InventoryDumperContext dumperContext, final IngestPosition position,

Reply via email to