This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 814337927a1 Extract InventoryPositionCalculator (#37520)
814337927a1 is described below
commit 814337927a117817481536087f5d2a86e73d1d34
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Dec 25 20:26:39 2025 +0800
Extract InventoryPositionCalculator (#37520)
---
.../position/InventoryPositionCalculator.java | 70 ++++++++++++++++++++++
.../splitter/InventoryDumperContextSplitter.java | 28 +++------
2 files changed, 77 insertions(+), 21 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
new file mode 100644
index 00000000000..664842a6e33
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.Range;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.estimated.InventoryPositionEstimatedCalculator;
+import
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
+import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Inventory position calculator.
+ */
+@RequiredArgsConstructor
+public final class InventoryPositionCalculator {
+
+ private final PipelineDataSource sourceDataSource;
+
+ private final String schemaName;
+
+ private final String tableName;
+
+ private final List<PipelineColumnMetaData> uniqueKeyColumns;
+
+ private final long tableRecordsCount;
+
+ private final int shardingSize;
+
+ /**
+ * Get positions.
+ *
+ * @return positions
+ */
+ public List<IngestPosition> getPositions() {
+ DialectDataTypeOption dataTypeOption = new
DatabaseTypeRegistry(sourceDataSource.getDatabaseType()).getDialectDatabaseMetaData().getDataTypeOption();
+ int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
+ if (dataTypeOption.isIntegerDataType(firstColumnDataType)) {
+ String uniqueKey = uniqueKeyColumns.get(0).getName();
+ Range<Long> uniqueKeyValuesRange =
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(sourceDataSource,
schemaName, tableName, uniqueKey);
+ return
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount,
uniqueKeyValuesRange, shardingSize);
+ }
+ if (1 == uniqueKeyColumns.size() &&
dataTypeOption.isStringDataType(firstColumnDataType)) {
+ return Collections.singletonList(new
StringPrimaryKeyIngestPosition(null, null));
+ }
+ return Collections.singletonList(new UnsupportedKeyIngestPosition());
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index b8c262355af..1c377a279db 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -18,23 +18,19 @@
package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter;
import lombok.RequiredArgsConstructor;
-import org.apache.commons.lang3.Range;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.estimated.InventoryPositionEstimatedCalculator;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.InventoryPositionCalculator;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
-import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import java.util.Collection;
@@ -87,7 +83,7 @@ public final class InventoryDumperContextSplitter {
int batchSize =
jobProcessContext.getProcessConfiguration().getRead().getBatchSize();
JobRateLimitAlgorithm rateLimitAlgorithm =
jobProcessContext.getReadRateLimitAlgorithm();
int i = 0;
- for (IngestPosition each : getInventoryPositions(dumperContext,
jobItemContext)) {
+ for (IngestPosition each : getPositions(dumperContext,
jobItemContext)) {
result.add(createPrimaryKeySplitDumperContext(dumperContext, each,
i++, batchSize, rateLimitAlgorithm));
}
return result;
@@ -98,7 +94,7 @@ public final class InventoryDumperContextSplitter {
return PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName,
dumperContext.getActualTableName(), jobItemContext.getSourceMetaDataLoader());
}
- private Collection<IngestPosition> getInventoryPositions(final
InventoryDumperContext dumperContext, final TransmissionJobItemContext
jobItemContext) {
+ private Collection<IngestPosition> getPositions(final
InventoryDumperContext dumperContext, final TransmissionJobItemContext
jobItemContext) {
TransmissionJobItemProgress initProgress =
jobItemContext.getInitProgress();
if (null != initProgress) {
// Do NOT filter FinishedPosition here, since whole inventory
tasks are required in job progress when persisting to register center.
@@ -112,20 +108,10 @@ public final class InventoryDumperContextSplitter {
if (!dumperContext.hasUniqueKey()) {
return Collections.singleton(new UnsupportedKeyIngestPosition());
}
- DialectDataTypeOption dataTypeOption = new
DatabaseTypeRegistry(sourceDataSource.getDatabaseType()).getDialectDatabaseMetaData().getDataTypeOption();
- List<PipelineColumnMetaData> uniqueKeyColumns =
dumperContext.getUniqueKeyColumns();
- int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
- if (dataTypeOption.isIntegerDataType(firstColumnDataType)) {
- String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
- String uniqueKey =
dumperContext.getUniqueKeyColumns().get(0).getName();
- Range<Long> uniqueKeyValuesRange =
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(sourceDataSource,
schemaName, dumperContext.getActualTableName(), uniqueKey);
- int shardingSize =
jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
- return
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount,
uniqueKeyValuesRange, shardingSize);
- }
- if (1 == uniqueKeyColumns.size() &&
dataTypeOption.isStringDataType(firstColumnDataType)) {
- return Collections.singleton(new
StringPrimaryKeyIngestPosition(null, null));
- }
- return Collections.singleton(new UnsupportedKeyIngestPosition());
+ String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+ int shardingSize =
jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
+ return new InventoryPositionCalculator(sourceDataSource, schemaName,
dumperContext.getActualTableName(),
+ dumperContext.getUniqueKeyColumns(), tableRecordsCount,
shardingSize).getPositions();
}
private InventoryDumperContext createPrimaryKeySplitDumperContext(final
InventoryDumperContext dumperContext, final IngestPosition position,