This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d07f233e33a Recover value of migration incremental importer batch size 
(#34670)
d07f233e33a is described below

commit d07f233e33aa49a853ed23665bef1ec0a944929f
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Feb 14 20:42:22 2025 +0800

    Recover value of migration incremental importer batch size (#34670)
---
 .../core/importer/sink/type/PipelineDataSourceSink.java  | 16 ++++++++++++++++
 .../core/ingest/record/group/DataRecordGroupEngine.java  |  9 ++++++---
 .../ingest/record/group/DataRecordGroupEngineTest.java   | 11 ++++++++++-
 .../migration/preparer/MigrationJobPreparer.java         |  3 ++-
 .../migration/primarykey/IndexesMigrationE2EIT.java      |  2 +-
 5 files changed, 35 insertions(+), 6 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index d9f1d81f0ab..87429222f68 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -41,6 +41,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -78,6 +79,10 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         if (dataRecords.isEmpty()) {
             return new PipelineJobUpdateProgress(0);
         }
+        if (dataRecords.iterator().next().getUniqueKeyValue().isEmpty()) {
+            sequentialWrite(dataRecords);
+            return new PipelineJobUpdateProgress(dataRecords.size());
+        }
         for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
             batchWrite(each.getDeleteDataRecords());
             batchWrite(each.getInsertDataRecords());
@@ -86,6 +91,17 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         return new PipelineJobUpdateProgress((int) 
dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == 
each.getType()).count());
     }
     
+    private void sequentialWrite(final List<DataRecord> buffer) {
+        // TODO It's better to use transaction, but delete operation may not 
take effect on PostgreSQL sometimes
+        try {
+            for (DataRecord each : buffer) {
+                doWrite(Collections.singletonList(each), true);
+            }
+        } catch (final SQLException ex) {
+            throw new PipelineImporterJobWriteException(ex);
+        }
+    }
+    
     @SuppressWarnings("BusyWait")
     @SneakyThrows(InterruptedException.class)
     private void batchWrite(final Collection<DataRecord> records) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index aae1e972858..b8f9c6dacd0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -72,13 +72,16 @@ public final class DataRecordGroupEngine {
     /**
      * Group by table and type.
      *
-     * @param dataRecords data records
+     * @param dataRecords data records, related table must have primary key or 
unique key
      * @return grouped data records
+     * @throws IllegalArgumentException if related table has no primary key or 
unique key
      */
     public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
+        DataRecord firstDataRecord = dataRecords.get(0);
+        
ShardingSpherePreconditions.checkState(!firstDataRecord.getUniqueKeyValue().isEmpty(),
+                () -> new 
IllegalArgumentException(firstDataRecord.getTableName() + " must have primary 
key or unique key"));
         List<GroupedDataRecord> result = new ArrayList<>(100);
-        List<DataRecord> mergedDataRecords = 
dataRecords.get(0).getUniqueKeyValue().isEmpty() ? dataRecords : 
merge(dataRecords);
-        Map<String, List<DataRecord>> tableGroup = 
mergedDataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
+        Map<String, List<DataRecord>> tableGroup = 
merge(dataRecords).stream().collect(Collectors.groupingBy(DataRecord::getTableName));
         for (Entry<String, List<DataRecord>> entry : tableGroup.entrySet()) {
             Map<PipelineSQLOperationType, List<DataRecord>> typeGroup = 
entry.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
             result.add(new GroupedDataRecord(entry.getKey(), 
typeGroup.getOrDefault(PipelineSQLOperationType.INSERT, 
Collections.emptyList()),
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
index fc4bf4bc340..6dddb5bd513 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
@@ -216,7 +217,15 @@ class DataRecordGroupEngineTest {
     }
     
     @Test
-    void assertGroup() {
+    void assertGroupOnTableHasNoUniqueKey() {
+        DataRecord dataRecord = new 
DataRecord(PipelineSQLOperationType.INSERT, "order", new 
IngestPlaceholderPosition(), 3);
+        dataRecord.setActualTableName("order_0");
+        List<DataRecord> dataRecords = Collections.singletonList(dataRecord);
+        assertThrows(IllegalArgumentException.class, () -> 
groupEngine.group(dataRecords));
+    }
+    
+    @Test
+    void assertGroupOnTableHasUniqueKey() {
         List<DataRecord> dataRecords = mockDataRecords();
         List<GroupedDataRecord> groupedDataRecords = 
groupEngine.group(dataRecords);
         assertThat(groupedDataRecords.size(), is(2));
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 1d2314aa29e..029b97261e8 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -189,7 +189,8 @@ public final class MigrationJobPreparer implements 
PipelineJobPreparer<Migration
         CreateIncrementalDumperParameter param = new 
CreateIncrementalDumperParameter(
                 dumperContext, dumperContext.getCommonContext().getPosition(), 
channel, jobItemContext.getSourceMetaDataLoader(), 
jobItemContext.getDataSourceManager());
         Dumper dumper = IncrementalDumperCreator.create(param);
-        Collection<Importer> importers = Collections.singletonList(new 
SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(), 
jobItemContext));
+        Collection<Importer> importers = Collections.singletonList(new 
SingleChannelConsumerImporter(channel, 
taskConfig.getImporterConfig().getBatchSize(), 1000L,
+                jobItemContext.getSink(), jobItemContext));
         PipelineTask incrementalTask = new 
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), 
incrementalExecuteEngine, dumper, importers, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);
     }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index fa9bc36df4b..50afba8399e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -96,7 +96,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             // TODO PostgreSQL update delete events not support if table 
without unique keys at increment task.
             final Consumer<DataSource> incrementalTaskFn = dataSource -> {
                 if (containerComposer.getDatabaseType() instanceof 
MySQLDatabaseType) {
-                    doCreateUpdateDelete(containerComposer, 
keyGenerateAlgorithm.generateKeys(mock(AlgorithmSQLContext.class), 
1).iterator().next());
+                    doCreateUpdateDelete(containerComposer, "a1");
                 }
                 Object orderId = 
keyGenerateAlgorithm.generateKeys(mock(AlgorithmSQLContext.class), 
1).iterator().next();
                 insertOneOrder(containerComposer, orderId);

Reply via email to