This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 38ead32b4dc Fix the mismatch between processed record count and actual
data count for inventory dumper (#33996)
38ead32b4dc is described below
commit 38ead32b4dcbf144c3aecf73ef4364a282abf904
Author: Haoran Meng <[email protected]>
AuthorDate: Wed Dec 11 10:03:11 2024 +0800
Fix the mismatch between processed record count and actual data count for
inventory dumper (#33996)
---
.../pipeline/core/ingest/dumper/inventory/InventoryDumper.java | 6 ++++--
.../core/ingest/dumper/inventory/InventoryDumperContext.java | 2 ++
.../inventory/splitter/InventoryDumperContextSplitter.java | 10 ++++++++--
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index ff7a42fbb0c..39024c346be 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -144,7 +144,8 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
AtomicLong rowCount = new AtomicLong();
IngestPosition position =
dumperContext.getCommonContext().getPosition();
do {
- QueryRange queryRange = new
QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(),
firstQuery, ((PrimaryKeyIngestPosition<?>) position).getEndValue());
+ QueryRange queryRange = new
QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), firstQuery
&& dumperContext.isFirstDump(),
+ ((PrimaryKeyIngestPosition<?>) position).getEndValue());
InventoryQueryParameter<?> queryParam = new
InventoryRangeQueryParameter(queryRange);
List<Record> dataRecords = dumpByPage(connection, queryParam,
rowCount, tableMetaData);
if (dataRecords.size() > 1 &&
Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0),
getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
@@ -169,7 +170,8 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
final InventoryQueryParameter<?>
queryParam, final AtomicLong rowCount, final PipelineTableMetaData
tableMetaData) throws SQLException {
DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
int batchSize = dumperContext.getBatchSize();
- try (PreparedStatement preparedStatement =
JDBCStreamQueryBuilder.build(databaseType, connection,
buildDumpByPageSQL(queryParam), batchSize)) {
+ String sql = buildDumpByPageSQL(queryParam);
+ try (PreparedStatement preparedStatement =
JDBCStreamQueryBuilder.build(databaseType, connection, sql, batchSize)) {
runningStatement.set(preparedStatement);
setParameters(preparedStatement, queryParam);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
index 306be54aadd..2052b2c005e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
@@ -58,6 +58,8 @@ public final class InventoryDumperContext {
private JobRateLimitAlgorithm rateLimitAlgorithm;
+ private boolean firstDump = true;
+
public InventoryDumperContext(final DumperCommonContext commonContext) {
this.commonContext = new DumperCommonContext(
commonContext.getDataSourceName(),
commonContext.getDataSourceConfig(), commonContext.getTableNameMapper(),
commonContext.getTableAndSchemaNameMapper());
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index bca5d267888..5fa5458177e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -93,7 +93,7 @@ public final class InventoryDumperContextSplitter {
JobRateLimitAlgorithm rateLimitAlgorithm =
jobProcessContext.getReadRateLimitAlgorithm();
int i = 0;
for (IngestPosition each : getInventoryPositions(dumperContext,
jobItemContext)) {
- result.add(createPrimaryKeySplitDumperContext(dumperContext, each,
i++, batchSize, rateLimitAlgorithm));
+ result.add(createPrimaryKeySplitDumperContext(dumperContext, each,
i++, batchSize, rateLimitAlgorithm, jobItemContext));
}
return result;
}
@@ -149,7 +149,8 @@ public final class InventoryDumperContextSplitter {
}
private InventoryDumperContext createPrimaryKeySplitDumperContext(final
InventoryDumperContext dumperContext, final IngestPosition position,
- final
int shardingItem, final int batchSize, final JobRateLimitAlgorithm
rateLimitAlgorithm) {
+ final
int shardingItem, final int batchSize, final JobRateLimitAlgorithm
rateLimitAlgorithm,
+ final
TransmissionJobItemContext jobItemContext) {
InventoryDumperContext result = new
InventoryDumperContext(dumperContext.getCommonContext());
result.getCommonContext().setPosition(position);
result.setShardingItem(shardingItem);
@@ -159,6 +160,11 @@ public final class InventoryDumperContextSplitter {
result.setInsertColumnNames(dumperContext.getInsertColumnNames());
result.setBatchSize(batchSize);
result.setRateLimitAlgorithm(rateLimitAlgorithm);
+ result.setFirstDump(isFirstDump(jobItemContext));
return result;
}
+
+ private boolean isFirstDump(final TransmissionJobItemContext
jobItemContext) {
+ return null == jobItemContext.getInitProgress() &&
jobItemContext.getProcessedRecordsCount() == 0;
+ }
}