This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e00383f7363 Refactor InventoryDumperContextSplitter (#32597)
e00383f7363 is described below
commit e00383f736322b90db9ff72a6dfbf7829f04d5fb
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Aug 20 09:47:35 2024 +0800
Refactor InventoryDumperContextSplitter (#32597)
---
.../inventory/splitter/InventoryDumperContextSplitter.java | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 6a68d80d70e..7b64da57f4d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -39,7 +39,6 @@ import
org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
-import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -124,7 +123,7 @@ public final class InventoryDumperContextSplitter {
if (1 == uniqueKeyColumns.size()) {
int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
- return getPositionByIntegerUniqueKeyRange(dumperContext,
tableRecordsCount, jobItemContext, sourceDataSource);
+ return getPositionByIntegerUniqueKeyRange(dumperContext,
tableRecordsCount, jobItemContext);
}
if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
// TODO Support string unique key table splitting. Ascii
characters ordering are different in different versions of databases.
@@ -134,13 +133,12 @@ public final class InventoryDumperContextSplitter {
return Collections.singleton(new UnsupportedKeyIngestPosition());
}
- private Collection<IngestPosition>
getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext,
final long tableRecordsCount,
-
final TransmissionJobItemContext jobItemContext, final
PipelineDataSourceWrapper dataSource) {
+ private Collection<IngestPosition>
getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext,
final long tableRecordsCount, final TransmissionJobItemContext jobItemContext) {
if (0L == tableRecordsCount) {
return Collections.singletonList(new
IntegerPrimaryKeyIngestPosition(0L, 0L));
}
Collection<IngestPosition> result = new LinkedList<>();
- Range<Long> uniqueKeyValuesRange =
getUniqueKeyValuesRange(jobItemContext, dataSource, dumperContext);
+ Range<Long> uniqueKeyValuesRange =
getUniqueKeyValuesRange(jobItemContext, dumperContext);
int shardingSize =
jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
long splitCount = tableRecordsCount / shardingSize +
(tableRecordsCount % shardingSize > 0L ? 1 : 0);
long interval = (uniqueKeyValuesRange.getMaximum() -
uniqueKeyValuesRange.getMinimum()) / splitCount;
@@ -152,13 +150,13 @@ public final class InventoryDumperContextSplitter {
return result;
}
- private Range<Long> getUniqueKeyValuesRange(final
TransmissionJobItemContext jobItemContext, final DataSource dataSource, final
InventoryDumperContext dumperContext) {
+ private Range<Long> getUniqueKeyValuesRange(final
TransmissionJobItemContext jobItemContext, final InventoryDumperContext
dumperContext) {
String uniqueKey =
dumperContext.getUniqueKeyColumns().get(0).getName();
PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
dumperContext.getActualTableName(), uniqueKey);
try (
- Connection connection = dataSource.getConnection();
+ Connection connection = sourceDataSource.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
resultSet.next();