This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 38ead32b4dc Fix the mismatch between processed record count and actual 
data count for inventory dumper (#33996)
38ead32b4dc is described below

commit 38ead32b4dcbf144c3aecf73ef4364a282abf904
Author: Haoran Meng <[email protected]>
AuthorDate: Wed Dec 11 10:03:11 2024 +0800

    Fix the mismatch between processed record count and actual data count for 
inventory dumper (#33996)
---
 .../pipeline/core/ingest/dumper/inventory/InventoryDumper.java |  6 ++++--
 .../core/ingest/dumper/inventory/InventoryDumperContext.java   |  2 ++
 .../inventory/splitter/InventoryDumperContextSplitter.java     | 10 ++++++++--
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index ff7a42fbb0c..39024c346be 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -144,7 +144,8 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         AtomicLong rowCount = new AtomicLong();
         IngestPosition position = 
dumperContext.getCommonContext().getPosition();
         do {
-            QueryRange queryRange = new 
QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), 
firstQuery, ((PrimaryKeyIngestPosition<?>) position).getEndValue());
+            QueryRange queryRange = new 
QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), firstQuery 
&& dumperContext.isFirstDump(),
+                    ((PrimaryKeyIngestPosition<?>) position).getEndValue());
             InventoryQueryParameter<?> queryParam = new 
InventoryRangeQueryParameter(queryRange);
             List<Record> dataRecords = dumpByPage(connection, queryParam, 
rowCount, tableMetaData);
             if (dataRecords.size() > 1 && 
Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0), 
getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
@@ -169,7 +170,8 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                                     final InventoryQueryParameter<?> 
queryParam, final AtomicLong rowCount, final PipelineTableMetaData 
tableMetaData) throws SQLException {
         DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
         int batchSize = dumperContext.getBatchSize();
-        try (PreparedStatement preparedStatement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
buildDumpByPageSQL(queryParam), batchSize)) {
+        String sql = buildDumpByPageSQL(queryParam);
+        try (PreparedStatement preparedStatement = 
JDBCStreamQueryBuilder.build(databaseType, connection, sql, batchSize)) {
             runningStatement.set(preparedStatement);
             setParameters(preparedStatement, queryParam);
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
index 306be54aadd..2052b2c005e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
@@ -58,6 +58,8 @@ public final class InventoryDumperContext {
     
     private JobRateLimitAlgorithm rateLimitAlgorithm;
     
+    private boolean firstDump = true;
+    
     public InventoryDumperContext(final DumperCommonContext commonContext) {
         this.commonContext = new DumperCommonContext(
                 commonContext.getDataSourceName(), 
commonContext.getDataSourceConfig(), commonContext.getTableNameMapper(), 
commonContext.getTableAndSchemaNameMapper());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index bca5d267888..5fa5458177e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -93,7 +93,7 @@ public final class InventoryDumperContextSplitter {
         JobRateLimitAlgorithm rateLimitAlgorithm = 
jobProcessContext.getReadRateLimitAlgorithm();
         int i = 0;
         for (IngestPosition each : getInventoryPositions(dumperContext, 
jobItemContext)) {
-            result.add(createPrimaryKeySplitDumperContext(dumperContext, each, 
i++, batchSize, rateLimitAlgorithm));
+            result.add(createPrimaryKeySplitDumperContext(dumperContext, each, 
i++, batchSize, rateLimitAlgorithm, jobItemContext));
         }
         return result;
     }
@@ -149,7 +149,8 @@ public final class InventoryDumperContextSplitter {
     }
     
     private InventoryDumperContext createPrimaryKeySplitDumperContext(final 
InventoryDumperContext dumperContext, final IngestPosition position,
-                                                                      final 
int shardingItem, final int batchSize, final JobRateLimitAlgorithm 
rateLimitAlgorithm) {
+                                                                      final 
int shardingItem, final int batchSize, final JobRateLimitAlgorithm 
rateLimitAlgorithm,
+                                                                      final 
TransmissionJobItemContext jobItemContext) {
         InventoryDumperContext result = new 
InventoryDumperContext(dumperContext.getCommonContext());
         result.getCommonContext().setPosition(position);
         result.setShardingItem(shardingItem);
@@ -159,6 +160,11 @@ public final class InventoryDumperContextSplitter {
         result.setInsertColumnNames(dumperContext.getInsertColumnNames());
         result.setBatchSize(batchSize);
         result.setRateLimitAlgorithm(rateLimitAlgorithm);
+        result.setFirstDump(isFirstDump(jobItemContext));
         return result;
     }
+    
+    private boolean isFirstDump(final TransmissionJobItemContext 
jobItemContext) {
+        return null == jobItemContext.getInitProgress() && 
jobItemContext.getProcessedRecordsCount() == 0;
+    }
 }

Reply via email to