This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7345274fb6d Update pipeline code style (#37526)
7345274fb6d is described below
commit 7345274fb6d6c8f548f8b9682a0a1202c4342e57
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Dec 26 10:54:19 2025 +0800
Update pipeline code style (#37526)
* Rename and param ordering
* Use QualifiedTable
* Rename
---
.../position/InventoryPositionCalculator.java | 11 +++++------
.../InventoryPositionEstimatedCalculator.java | 12 ++++++------
.../exact/InventoryPositionExactCalculator.java | 20 ++++++++++----------
.../splitter/InventoryDumperContextSplitter.java | 7 ++++---
.../inventory/splitter/InventoryTaskSplitter.java | 6 +++---
5 files changed, 28 insertions(+), 28 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
index 3a64f704d4e..b1bbd93484e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColum
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.estimated.InventoryPositionEstimatedCalculator;
import
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import java.util.Collections;
import java.util.List;
@@ -37,11 +38,9 @@ import java.util.List;
@RequiredArgsConstructor
public final class InventoryPositionCalculator {
- private final PipelineDataSource sourceDataSource;
+ private final PipelineDataSource dataSource;
- private final String schemaName;
-
- private final String tableName;
+ private final QualifiedTable qualifiedTable;
private final List<PipelineColumnMetaData> uniqueKeyColumns;
@@ -55,11 +54,11 @@ public final class InventoryPositionCalculator {
* @return positions
*/
public List<IngestPosition> getPositions() {
- DialectDataTypeOption dataTypeOption = new
DatabaseTypeRegistry(sourceDataSource.getDatabaseType()).getDialectDatabaseMetaData().getDataTypeOption();
+ DialectDataTypeOption dataTypeOption = new
DatabaseTypeRegistry(dataSource.getDatabaseType()).getDialectDatabaseMetaData().getDataTypeOption();
int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
if (dataTypeOption.isIntegerDataType(firstColumnDataType)) {
String uniqueKey = uniqueKeyColumns.get(0).getName();
- QueryRange uniqueKeyValuesRange =
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(sourceDataSource,
schemaName, tableName, uniqueKey);
+ QueryRange uniqueKeyValuesRange =
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(qualifiedTable,
uniqueKey, dataSource);
return
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount,
uniqueKeyValuesRange, shardingSize);
}
if (1 == uniqueKeyColumns.size() &&
dataTypeOption.isStringDataType(firstColumnDataType)) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
index 9216a35a9f1..aec8ef6ad2d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPositi
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import java.math.BigInteger;
import java.sql.Connection;
@@ -46,16 +47,15 @@ public final class InventoryPositionEstimatedCalculator {
/**
* Get integer unique key values range.
*
- * @param dataSource data source
- * @param schemaName schema name
- * @param tableName table name
+ * @param qualifiedTable qualified table
* @param uniqueKey unique key
+ * @param dataSource data source
* @return unique key values range
* @throws SplitPipelineJobByUniqueKeyException if an error occurs while
getting unique key values range
*/
- public static QueryRange getIntegerUniqueKeyValuesRange(final
PipelineDataSource dataSource, final String schemaName, final String tableName,
final String uniqueKey) {
+ public static QueryRange getIntegerUniqueKeyValuesRange(final
QualifiedTable qualifiedTable, final String uniqueKey, final PipelineDataSource
dataSource) {
PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
- String sql =
pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(schemaName, tableName,
uniqueKey);
+ String sql =
pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName(), uniqueKey);
try (
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
@@ -63,7 +63,7 @@ public final class InventoryPositionEstimatedCalculator {
resultSet.next();
return QueryRange.closed(resultSet.getLong(1),
resultSet.getLong(2));
} catch (final SQLException ex) {
- throw new SplitPipelineJobByUniqueKeyException(tableName,
uniqueKey, ex);
+ throw new
SplitPipelineJobByUniqueKeyException(qualifiedTable.getTableName(), uniqueKey,
ex);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
index 0f8712ef837..dd8d3f2a40b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
@@ -48,26 +48,26 @@ public final class InventoryPositionExactCalculator {
* @param qualifiedTable qualified table
* @param uniqueKey unique key
* @param shardingSize sharding size
- * @param sourceDataSource source data source
+ * @param dataSource data source
* @param positionHandler position handler
* @return positions
* @throws SplitPipelineJobByUniqueKeyException if an error occurs while
splitting table by unique key
*/
public static <T> List<IngestPosition> getPositions(final QualifiedTable
qualifiedTable, final String uniqueKey, final int shardingSize,
- final
PipelineDataSource sourceDataSource, final DataTypePositionHandler<T>
positionHandler) {
+ final
PipelineDataSource dataSource, final DataTypePositionHandler<T>
positionHandler) {
List<IngestPosition> result = new LinkedList<>();
- PrimaryKeyIngestPosition<T> firstPosition =
getFirstPosition(qualifiedTable, uniqueKey, shardingSize, sourceDataSource,
positionHandler);
+ PrimaryKeyIngestPosition<T> firstPosition =
getFirstPosition(qualifiedTable, uniqueKey, shardingSize, dataSource,
positionHandler);
result.add(firstPosition);
- result.addAll(getLeftPositions(qualifiedTable, uniqueKey,
shardingSize, firstPosition, sourceDataSource, positionHandler));
+ result.addAll(getLeftPositions(qualifiedTable, uniqueKey,
shardingSize, firstPosition, dataSource, positionHandler));
return result;
}
private static <T> PrimaryKeyIngestPosition<T> getFirstPosition(final
QualifiedTable qualifiedTable, final String uniqueKey, final int shardingSize,
- final
PipelineDataSource sourceDataSource, final DataTypePositionHandler<T>
positionHandler) {
- String firstQuerySQL = new
PipelinePrepareSQLBuilder(sourceDataSource.getDatabaseType())
+ final
PipelineDataSource dataSource, final DataTypePositionHandler<T>
positionHandler) {
+ String firstQuerySQL = new
PipelinePrepareSQLBuilder(dataSource.getDatabaseType())
.buildSplitByUniqueKeyRangedSQL(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName(), uniqueKey, false);
try (
- Connection connection = sourceDataSource.getConnection();
+ Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(firstQuerySQL)) {
preparedStatement.setLong(1, shardingSize);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
@@ -91,14 +91,14 @@ public final class InventoryPositionExactCalculator {
private static <T> List<IngestPosition> getLeftPositions(final
QualifiedTable qualifiedTable, final String uniqueKey,
final int
shardingSize, final PrimaryKeyIngestPosition<T> firstPosition,
- final
PipelineDataSource sourceDataSource, final DataTypePositionHandler<T>
positionHandler) {
+ final
PipelineDataSource dataSource, final DataTypePositionHandler<T>
positionHandler) {
List<IngestPosition> result = new LinkedList<>();
T lowerValue = firstPosition.getEndValue();
long recordsCount = 0;
- String laterQuerySQL = new
PipelinePrepareSQLBuilder(sourceDataSource.getDatabaseType())
+ String laterQuerySQL = new
PipelinePrepareSQLBuilder(dataSource.getDatabaseType())
.buildSplitByUniqueKeyRangedSQL(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName(), uniqueKey, true);
try (
- Connection connection = sourceDataSource.getConnection();
+ Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(laterQuerySQL)) {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
positionHandler.setPreparedStatementValue(preparedStatement,
1, lowerValue);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 1c377a279db..34fae406389 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColum
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.InventoryPositionCalculator;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import java.util.Collection;
@@ -45,7 +46,7 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor
public final class InventoryDumperContextSplitter {
- private final PipelineDataSource sourceDataSource;
+ private final PipelineDataSource dataSource;
private final InventoryDumperContext dumperContext;
@@ -103,14 +104,14 @@ public final class InventoryDumperContextSplitter {
return result;
}
}
- long tableRecordsCount =
InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext,
sourceDataSource);
+ long tableRecordsCount =
InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, dataSource);
jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
if (!dumperContext.hasUniqueKey()) {
return Collections.singleton(new UnsupportedKeyIngestPosition());
}
String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
int shardingSize =
jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
- return new InventoryPositionCalculator(sourceDataSource, schemaName,
dumperContext.getActualTableName(),
+ return new InventoryPositionCalculator(dataSource, new
QualifiedTable(schemaName, dumperContext.getActualTableName()),
dumperContext.getUniqueKeyColumns(), tableRecordsCount,
shardingSize).getPositions();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
index 7716dfd5f52..d7e76f8fd8d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
@@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public final class InventoryTaskSplitter {
- private final PipelineDataSource sourceDataSource;
+ private final PipelineDataSource dataSource;
private final InventoryDumperContext dumperContext;
@@ -64,12 +64,12 @@ public final class InventoryTaskSplitter {
List<InventoryTask> result = new LinkedList<>();
long startTimeMillis = System.currentTimeMillis();
TransmissionProcessContext processContext =
jobItemContext.getJobProcessContext();
- InventoryDumperContextSplitter dumperContextSplitter = new
InventoryDumperContextSplitter(sourceDataSource, dumperContext);
+ InventoryDumperContextSplitter dumperContextSplitter = new
InventoryDumperContextSplitter(dataSource, dumperContext);
for (InventoryDumperContext each :
dumperContextSplitter.split(jobItemContext)) {
AtomicReference<IngestPosition> position = new
AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel =
InventoryChannelCreator.create(processContext.getProcessConfiguration().getStreamChannel(),
importerConfig.getBatchSize(), position);
InventoryDataRecordPositionCreator positionCreator =
each.hasUniqueKey() ? new UniqueKeyInventoryDataRecordPositionCreator() : new
PlaceholderInventoryDataRecordPositionCreator();
- Dumper dumper = new InventoryDumper(each, channel,
sourceDataSource, positionCreator);
+ Dumper dumper = new InventoryDumper(each, channel, dataSource,
positionCreator);
Importer importer = new SingleChannelConsumerImporter(channel,
importerConfig.getBatchSize(), 3000L, jobItemContext.getSink(), jobItemContext);
result.add(new
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(),
processContext.getInventoryImporterExecuteEngine(), dumper, importer,
position));