This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7195cdf91a4 Refactor data record merge at pipeline (#25307)
7195cdf91a4 is described below

commit 7195cdf91a445d9de521328d27d1a289a05669cf
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Apr 26 15:28:46 2023 +0800

    Refactor data record merge at pipeline (#25307)
    
    * Refactor data record merge at pipeline
    
    * Combine DataRecord merge and group
    
    * Combine DataRecord merge and group
    
    * improve error reason
---
 .../pipeline/api/ingest/record/DataRecord.java     |   2 +-
 .../api/ingest/record/GroupedDataRecord.java       |   8 +-
 .../job/PipelineImporterJobWriteException.java     |   4 +
 .../pipeline/core/importer/DataRecordMerger.java   | 156 +++++-----------
 .../pipeline/core/importer/DataSourceImporter.java |  73 ++++++--
 .../core/importer/DataRecordMergerTest.java        | 202 +++++++--------------
 .../pipeline/cases/task/E2EIncrementalTask.java    |  15 +-
 .../core/importer/DataSourceImporterTest.java      |   1 +
 8 files changed, 190 insertions(+), 271 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
index c9cb71f7f6a..956887c95f7 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
@@ -33,7 +33,7 @@ import java.util.List;
  */
 @Getter
 @Setter
-@EqualsAndHashCode(of = {"tableName", "uniqueKeyValue"}, callSuper = false)
+@EqualsAndHashCode(of = "tableName", callSuper = false)
 @ToString
 public final class DataRecord extends Record {
     
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/GroupedDataRecord.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/GroupedDataRecord.java
index d0ae80aaaec..4cb8b94939d 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/GroupedDataRecord.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/GroupedDataRecord.java
@@ -30,9 +30,11 @@ public final class GroupedDataRecord {
     
     private final String tableName;
     
-    private final List<DataRecord> insertDataRecords;
+    private final List<DataRecord> batchInsertDataRecords;
     
-    private final List<DataRecord> updateDataRecords;
+    private final List<DataRecord> batchUpdateDataRecords;
     
-    private final List<DataRecord> deleteDataRecords;
+    private final List<DataRecord> batchDeleteDataRecords;
+    
+    private final List<DataRecord> nonBatchRecords;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
index b0f832722f7..48cf2130eea 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
@@ -30,4 +30,8 @@ public final class PipelineImporterJobWriteException extends 
PipelineSQLExceptio
     public PipelineImporterJobWriteException(final Exception cause) {
         super(XOpenSQLState.GENERAL_ERROR, 91, "Importer job write data 
failed.", cause);
     }
+    
+    public PipelineImporterJobWriteException(final String reason, final 
Exception cause) {
+        super(XOpenSQLState.GENERAL_ERROR, 91, reason, cause);
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
index 9b7ba2663e4..42dd3f29152 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
@@ -17,20 +17,21 @@
 
 package org.apache.shardingsphere.data.pipeline.core.importer;
 
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord.Key;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import org.apache.shardingsphere.data.pipeline.core.record.RecordUtils;
-import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -38,37 +39,6 @@ import java.util.stream.Collectors;
  */
 public final class DataRecordMerger {
     
-    /**
-     * Merge data record.
-     * <pre>
-     * insert + insert -&gt; exception
-     * update + insert -&gt; exception
-     * delete + insert -&gt; insert
-     * insert + update -&gt; insert
-     * update + update -&gt; update
-     * delete + update -&gt; exception
-     * insert + delete -&gt; delete
-     * update + delete -&gt; delete
-     * delete + delete -&gt; exception
-     * </pre>
-     *
-     * @param dataRecords data records
-     * @return merged data records
-     */
-    public List<DataRecord> merge(final List<DataRecord> dataRecords) {
-        Map<DataRecord.Key, DataRecord> result = new HashMap<>();
-        dataRecords.forEach(each -> {
-            if (IngestDataChangeType.INSERT.equals(each.getType())) {
-                mergeInsert(each, result);
-            } else if (IngestDataChangeType.UPDATE.equals(each.getType())) {
-                mergeUpdate(each, result);
-            } else if (IngestDataChangeType.DELETE.equals(each.getType())) {
-                mergeDelete(each, result);
-            }
-        });
-        return new ArrayList<>(result.values());
-    }
-    
     /**
      * Group by table and type.
      *
@@ -76,90 +46,50 @@ public final class DataRecordMerger {
      * @return grouped data records
      */
     public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
-        List<GroupedDataRecord> result = new ArrayList<>(100);
-        List<DataRecord> mergedDataRecords = 
dataRecords.get(0).getUniqueKeyValue().isEmpty() ? dataRecords : 
merge(dataRecords);
-        Map<String, List<DataRecord>> tableGroup = 
mergedDataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
-        for (Entry<String, List<DataRecord>> entry : tableGroup.entrySet()) {
-            Map<String, List<DataRecord>> typeGroup = 
entry.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
-            result.add(new GroupedDataRecord(entry.getKey(), 
typeGroup.get(IngestDataChangeType.INSERT), 
typeGroup.get(IngestDataChangeType.UPDATE), 
typeGroup.get(IngestDataChangeType.DELETE)));
-        }
-        return result;
-    }
-    
-    private void mergeInsert(final DataRecord dataRecord, final 
Map<DataRecord.Key, DataRecord> dataRecords) {
-        DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
-        ShardingSpherePreconditions.checkState(null == beforeDataRecord
-                || 
IngestDataChangeType.DELETE.equals(beforeDataRecord.getType()), () -> new 
PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
-        dataRecords.put(dataRecord.getKey(), dataRecord);
-    }
-    
-    private void mergeUpdate(final DataRecord dataRecord, final 
Map<DataRecord.Key, DataRecord> dataRecords) {
-        DataRecord beforeDataRecord = checkUpdatedPrimaryKey(dataRecord) ? 
dataRecords.get(dataRecord.getOldKey()) : dataRecords.get(dataRecord.getKey());
-        if (null == beforeDataRecord) {
-            dataRecords.put(dataRecord.getKey(), dataRecord);
-            return;
-        }
-        
ShardingSpherePreconditions.checkState(!IngestDataChangeType.DELETE.equals(beforeDataRecord.getType()),
 () -> new UnsupportedSQLOperationException("Not Delete"));
-        if (checkUpdatedPrimaryKey(dataRecord)) {
-            dataRecords.remove(dataRecord.getOldKey());
-        }
-        if (IngestDataChangeType.INSERT.equals(beforeDataRecord.getType())) {
-            DataRecord mergedDataRecord = mergeColumn(beforeDataRecord, 
dataRecord);
-            mergedDataRecord.setTableName(dataRecord.getTableName());
-            mergedDataRecord.setType(IngestDataChangeType.INSERT);
-            dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
-            return;
+        int insertCount = 0;
+        Map<Key, Boolean> duplicateKeyMap = new HashMap<>();
+        Set<String> tableNames = new LinkedHashSet<>();
+        for (DataRecord each : dataRecords) {
+            if (IngestDataChangeType.INSERT.equals(each.getType())) {
+                insertCount++;
+            }
+            tableNames.add(each.getTableName());
+            Key key = getKeyFromDataRecord(each);
+            if (duplicateKeyMap.containsKey(key)) {
+                duplicateKeyMap.put(key, true);
+                continue;
+            }
+            duplicateKeyMap.put(key, false);
         }
-        if (IngestDataChangeType.UPDATE.equals(beforeDataRecord.getType())) {
-            DataRecord mergedDataRecord = mergeColumn(beforeDataRecord, 
dataRecord);
-            mergedDataRecord.setTableName(dataRecord.getTableName());
-            mergedDataRecord.setType(IngestDataChangeType.UPDATE);
-            dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+        List<GroupedDataRecord> result = new ArrayList<>(100);
+        if (insertCount == dataRecords.size()) {
+            Map<String, List<DataRecord>> tableGroup = 
dataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
+            for (Entry<String, List<DataRecord>> entry : 
tableGroup.entrySet()) {
+                result.add(new GroupedDataRecord(entry.getKey(), 
entry.getValue(), Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()));
+            }
+            return result;
         }
-    }
-    
-    private void mergeDelete(final DataRecord dataRecord, final 
Map<DataRecord.Key, DataRecord> dataRecords) {
-        DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
-        ShardingSpherePreconditions.checkState(null == beforeDataRecord
-                || 
!IngestDataChangeType.DELETE.equals(beforeDataRecord.getType()), () -> new 
PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
-        if (null != beforeDataRecord && 
IngestDataChangeType.UPDATE.equals(beforeDataRecord.getType()) && 
checkUpdatedPrimaryKey(beforeDataRecord)) {
-            DataRecord mergedDataRecord = new 
DataRecord(dataRecord.getPosition(), dataRecord.getColumnCount());
-            for (int i = 0; i < dataRecord.getColumnCount(); i++) {
-                mergedDataRecord.addColumn(new 
Column(dataRecord.getColumn(i).getName(),
-                        dataRecord.getColumn(i).isUniqueKey() ? 
beforeDataRecord.getColumn(i).getOldValue() : 
beforeDataRecord.getColumn(i).getValue(), true, 
dataRecord.getColumn(i).isUniqueKey()));
+        Map<String, List<DataRecord>> nonBatchRecords = new LinkedHashMap<>();
+        Map<String, Map<String, List<DataRecord>>> batchDataRecords = new 
LinkedHashMap<>();
+        for (DataRecord each : dataRecords) {
+            Key key = getKeyFromDataRecord(each);
+            if (duplicateKeyMap.getOrDefault(key, false)) {
+                nonBatchRecords.computeIfAbsent(each.getTableName(), ignored 
-> new LinkedList<>()).add(each);
+                continue;
             }
-            mergedDataRecord.setTableName(dataRecord.getTableName());
-            mergedDataRecord.setType(IngestDataChangeType.DELETE);
-            dataRecords.remove(beforeDataRecord.getKey());
-            dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
-        } else {
-            dataRecords.put(dataRecord.getOldKey(), dataRecord);
+            Map<String, List<DataRecord>> recordMap = 
batchDataRecords.computeIfAbsent(each.getTableName(), ignored -> new 
HashMap<>());
+            recordMap.computeIfAbsent(each.getType(), ignored -> new 
LinkedList<>()).add(each);
         }
-    }
-    
-    private boolean checkUpdatedPrimaryKey(final DataRecord dataRecord) {
-        return 
RecordUtils.extractPrimaryColumns(dataRecord).stream().anyMatch(Column::isUpdated);
-    }
-    
-    private DataRecord mergeColumn(final DataRecord preDataRecord, final 
DataRecord curDataRecord) {
-        DataRecord result = new DataRecord(curDataRecord.getPosition(), 
curDataRecord.getColumnCount());
-        for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
-            result.addColumn(new Column(
-                    curDataRecord.getColumn(i).getName(),
-                    preDataRecord.getColumn(i).isUniqueKey()
-                            ? 
mergePrimaryKeyOldValue(preDataRecord.getColumn(i), curDataRecord.getColumn(i))
-                            : null,
-                    curDataRecord.getColumn(i).getValue(),
-                    preDataRecord.getColumn(i).isUpdated() || 
curDataRecord.getColumn(i).isUpdated(),
-                    curDataRecord.getColumn(i).isUniqueKey()));
+        for (String each : tableNames) {
+            Map<String, List<DataRecord>> batchMap = 
batchDataRecords.getOrDefault(each, Collections.emptyMap());
+            List<DataRecord> nonBatchRecordMap = 
nonBatchRecords.getOrDefault(each, Collections.emptyList());
+            result.add(new GroupedDataRecord(each, 
batchMap.getOrDefault(IngestDataChangeType.INSERT, Collections.emptyList()),
+                    batchMap.getOrDefault(IngestDataChangeType.UPDATE, 
Collections.emptyList()), batchMap.getOrDefault(IngestDataChangeType.DELETE, 
Collections.emptyList()), nonBatchRecordMap));
         }
         return result;
     }
     
-    private Object mergePrimaryKeyOldValue(final Column beforeColumn, final 
Column column) {
-        if (beforeColumn.isUpdated()) {
-            return beforeColumn.getOldValue();
-        }
-        return column.isUpdated() ? column.getOldValue() : null;
+    private Key getKeyFromDataRecord(final DataRecord dataRecord) {
+        return IngestDataChangeType.DELETE.equals(dataRecord.getType()) ? 
dataRecord.getOldKey() : dataRecord.getKey();
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 83692765b88..1ba4b49f1f5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -48,10 +48,13 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Default importer.
@@ -119,9 +122,10 @@ public final class DataSourceImporter extends 
AbstractLifecycleExecutor implemen
         }
         List<GroupedDataRecord> result = MERGER.group(dataRecords);
         for (GroupedDataRecord each : result) {
-            flushInternal(dataSource, each.getDeleteDataRecords());
-            flushInternal(dataSource, each.getInsertDataRecords());
-            flushInternal(dataSource, each.getUpdateDataRecords());
+            flushInternal(dataSource, each.getBatchDeleteDataRecords());
+            flushInternal(dataSource, each.getBatchInsertDataRecords());
+            flushInternal(dataSource, each.getBatchUpdateDataRecords());
+            sequentialFlush(dataSource, each.getNonBatchRecords());
         }
         return new PipelineJobProgressUpdatedParameter(insertRecordNumber);
     }
@@ -130,15 +134,11 @@ public final class DataSourceImporter extends 
AbstractLifecycleExecutor implemen
         if (null == buffer || buffer.isEmpty()) {
             return;
         }
-        try {
-            tryFlush(dataSource, buffer);
-        } catch (final SQLException ex) {
-            throw new PipelineImporterJobWriteException(ex);
-        }
+        tryFlush(dataSource, buffer);
     }
     
     @SneakyThrows(InterruptedException.class)
-    private void tryFlush(final DataSource dataSource, final List<DataRecord> 
buffer) throws SQLException {
+    private void tryFlush(final DataSource dataSource, final List<DataRecord> 
buffer) {
         for (int i = 0; isRunning() && i <= importerConfig.getRetryTimes(); 
i++) {
             try {
                 doFlush(dataSource, buffer);
@@ -146,7 +146,7 @@ public final class DataSourceImporter extends 
AbstractLifecycleExecutor implemen
             } catch (final SQLException ex) {
                 log.error("flush failed {}/{} times.", i, 
importerConfig.getRetryTimes(), ex);
                 if (i == importerConfig.getRetryTimes()) {
-                    throw ex;
+                    throw new PipelineImporterJobWriteException(ex);
                 }
                 Thread.sleep(Math.min(5 * 60 * 1000L, 1000L << i));
             }
@@ -182,6 +182,30 @@ public final class DataSourceImporter extends 
AbstractLifecycleExecutor implemen
         }
     }
     
+    private void doFlush(final Connection connection, final DataRecord each) 
throws SQLException {
+        switch (each.getType()) {
+            case IngestDataChangeType.INSERT:
+                if (null != rateLimitAlgorithm) {
+                    rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
+                }
+                executeBatchInsert(connection, 
Collections.singletonList(each));
+                break;
+            case IngestDataChangeType.UPDATE:
+                if (null != rateLimitAlgorithm) {
+                    rateLimitAlgorithm.intercept(JobOperationType.UPDATE, 1);
+                }
+                executeUpdate(connection, each);
+                break;
+            case IngestDataChangeType.DELETE:
+                if (null != rateLimitAlgorithm) {
+                    rateLimitAlgorithm.intercept(JobOperationType.DELETE, 1);
+                }
+                executeBatchDelete(connection, 
Collections.singletonList(each));
+                break;
+            default:
+        }
+    }
+    
     private void executeBatchInsert(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
         DataRecord dataRecord = dataRecords.get(0);
         String insertSql = 
pipelineSqlBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord);
@@ -243,16 +267,41 @@ public final class DataSourceImporter extends 
AbstractLifecycleExecutor implemen
             for (DataRecord each : dataRecords) {
                 conditionColumns = RecordUtils.extractConditionColumns(each, 
importerConfig.getShardingColumns(each.getTableName()));
                 for (int i = 0; i < conditionColumns.size(); i++) {
-                    preparedStatement.setObject(i + 1, 
conditionColumns.get(i).getOldValue());
+                    Object oldValue = conditionColumns.get(i).getOldValue();
+                    if (null == oldValue) {
+                        log.warn("Record old value is null, record={}", each);
+                    }
+                    preparedStatement.setObject(i + 1, oldValue);
                 }
                 preparedStatement.addBatch();
             }
-            preparedStatement.executeBatch();
+            int[] counts = preparedStatement.executeBatch();
+            if (IntStream.of(counts).anyMatch(value -> 1 != value)) {
+                log.warn("batchDelete failed, counts={}, sql={}", 
Arrays.toString(counts), deleteSQL);
+            }
         } finally {
             batchDeleteStatement = null;
         }
     }
     
+    private void sequentialFlush(final DataSource dataSource, final 
List<DataRecord> buffer) {
+        if (buffer.isEmpty()) {
+            return;
+        }
+        try (Connection connection = dataSource.getConnection()) {
+            // TODO it's better use transaction, but execute delete maybe not 
effect when open transaction of PostgreSQL sometimes
+            for (DataRecord each : buffer) {
+                try {
+                    doFlush(connection, each);
+                } catch (final SQLException ex) {
+                    throw new 
PipelineImporterJobWriteException(String.format("Write failed, record=%s", 
each), ex);
+                }
+            }
+        } catch (final SQLException ex) {
+            throw new PipelineImporterJobWriteException(ex);
+        }
+    }
+    
     @Override
     protected void doStop() throws SQLException {
         cancelStatement(batchInsertStatement);
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
index fa8c7c5ca33..93f6cc62a77 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
@@ -21,213 +21,143 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPo
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.sameInstance;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
 class DataRecordMergerTest {
     
     private final DataRecordMerger dataRecordMerger = new DataRecordMerger();
     
-    private DataRecord beforeDataRecord;
-    
-    private DataRecord afterDataRecord;
-    
-    private Collection<DataRecord> actual;
-    
-    @Test
-    void assertInsertBeforeInsert() {
-        beforeDataRecord = mockInsertDataRecord(1, 1, 1);
-        afterDataRecord = mockInsertDataRecord(1, 1, 1);
-        assertThrows(PipelineUnexpectedDataRecordOrderException.class, () -> 
dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
-    }
-    
-    @Test
-    void assertUpdateBeforeInsert() {
-        beforeDataRecord = mockUpdateDataRecord(1, 2, 2);
-        afterDataRecord = mockInsertDataRecord(1, 1, 1);
-        assertThrows(PipelineUnexpectedDataRecordOrderException.class, () -> 
dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
-    }
-    
     @Test
     void assertDeleteBeforeInsert() {
-        beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
-        afterDataRecord = mockInsertDataRecord(1, 1, 1);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, 
afterDataRecord));
+        DataRecord beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
+        DataRecord afterDataRecord = mockInsertDataRecord(1, 1, 1);
+        List<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertInsertBeforeUpdate() {
-        beforeDataRecord = mockInsertDataRecord(1, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(1, 2, 2);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, 
afterDataRecord));
+        DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+        List<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.INSERT));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertNull(dataRecord.getColumn(0).getOldValue());
-        assertThat(dataRecord.getColumn(0).getValue(), is(1));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertInsertBeforeUpdatePrimaryKey() {
-        beforeDataRecord = mockInsertDataRecord(1, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, 
afterDataRecord));
+        DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 1, 2, 2);
+        List<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.INSERT));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertNull(dataRecord.getColumn(0).getOldValue());
-        assertThat(dataRecord.getColumn(0).getValue(), is(2));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertUpdateBeforeUpdate() {
-        beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(1, 2, 2);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, 
afterDataRecord));
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+        List<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.UPDATE));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertNull(dataRecord.getColumn(0).getOldValue());
-        assertThat(dataRecord.getColumn(0).getValue(), is(1));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertUpdateBeforeUpdatePrimaryKey() {
-        beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, 
afterDataRecord));
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
+        List<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.UPDATE));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
-        assertThat(dataRecord.getColumn(0).getValue(), is(2));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
-    }
-    
-    @Test
-    void assertUpdatePrimaryKeyBeforeUpdate() {
-        beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(2, 2, 2);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, 
afterDataRecord));
-        assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.UPDATE));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
-        assertThat(dataRecord.getColumn(0).getValue(), is(2));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        
assertDataRecordsMatched(actual.iterator().next().getBatchUpdateDataRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertUpdatePrimaryKeyBeforeUpdatePrimaryKey() {
-        beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, 
afterDataRecord));
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+        DataRecord afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
+        List<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.UPDATE));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
-        assertThat(dataRecord.getColumn(0).getValue(), is(3));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
-    }
-    
-    @Test
-    void assertDeleteBeforeUpdate() {
-        beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(1, 2, 2);
-        assertThrows(UnsupportedSQLOperationException.class, () -> 
dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
+        assertThat(actual.get(0).getBatchUpdateDataRecords().size(), is(2));
+        GroupedDataRecord actualGroupedDataRecord = actual.get(0);
+        DataRecord actualFirstDataRecord = 
actualGroupedDataRecord.getBatchUpdateDataRecords().get(0);
+        assertThat(actualFirstDataRecord.getType(), 
is(IngestDataChangeType.UPDATE));
+        assertThat(actualFirstDataRecord.getTableName(), is("order"));
+        assertThat(actualFirstDataRecord.getColumn(0).getOldValue(), is(1));
+        assertThat(actualFirstDataRecord.getColumn(0).getValue(), is(2));
+        assertThat(actualFirstDataRecord.getColumn(1).getValue(), is(1));
+        assertThat(actualFirstDataRecord.getColumn(2).getValue(), is(1));
+        DataRecord actualSecondDataRecord = 
actualGroupedDataRecord.getBatchUpdateDataRecords().get(1);
+        assertThat(actualSecondDataRecord.getType(), 
is(IngestDataChangeType.UPDATE));
+        assertThat(actualSecondDataRecord.getTableName(), is("order"));
+        assertThat(actualSecondDataRecord.getColumn(0).getOldValue(), is(2));
+        assertThat(actualSecondDataRecord.getColumn(0).getValue(), is(3));
+        assertThat(actualSecondDataRecord.getColumn(1).getValue(), is(2));
+        assertThat(actualSecondDataRecord.getColumn(2).getValue(), is(2));
     }
     
     @Test
     void assertInsertBeforeDelete() {
-        beforeDataRecord = mockInsertDataRecord(1, 1, 1);
-        afterDataRecord = mockDeleteDataRecord(1, 1, 1);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, 
afterDataRecord));
+        DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+        List<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertUpdateBeforeDelete() {
-        beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
-        afterDataRecord = mockDeleteDataRecord(1, 1, 1);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, 
afterDataRecord));
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+        List<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertUpdatePrimaryKeyBeforeDelete() {
-        beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
-        afterDataRecord = mockDeleteDataRecord(2, 1, 1);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, 
afterDataRecord));
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+        DataRecord afterDataRecord = mockDeleteDataRecord(2, 1, 1);
+        List<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.DELETE));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertNull(dataRecord.getColumn(0).getOldValue());
-        assertThat(dataRecord.getColumn(0).getValue(), is(1));
-        assertThat(dataRecord.getColumn(1).getValue(), is(1));
-        assertThat(dataRecord.getColumn(2).getValue(), is(1));
+        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
-    @Test
-    void assertDeleteBeforeDelete() {
-        beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
-        afterDataRecord = mockDeleteDataRecord(1, 1, 1);
-        assertThrows(PipelineUnexpectedDataRecordOrderException.class, () -> 
dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
+    private void assertDataRecordsMatched(final List<DataRecord> 
actualRecords, final List<DataRecord> expectedRecords) {
+        for (int i = 0; i < actualRecords.size(); i++) {
+            assertThat(actualRecords.get(0), 
sameInstance(expectedRecords.get(0)));
+        }
     }
     
     @Test
     void assertGroup() {
-        List<DataRecord> dataRecords = mockDataRecords();
-        List<GroupedDataRecord> groupedDataRecords = 
dataRecordMerger.group(dataRecords);
-        assertThat(groupedDataRecords.size(), is(2));
-        assertThat(groupedDataRecords.get(0).getTableName(), is("t1"));
-        assertThat(groupedDataRecords.get(1).getTableName(), is("t2"));
-        assertThat(groupedDataRecords.get(0).getInsertDataRecords().size(), 
is(1));
-        assertThat(groupedDataRecords.get(0).getUpdateDataRecords().size(), 
is(1));
-        assertThat(groupedDataRecords.get(0).getDeleteDataRecords().size(), 
is(1));
-    }
-    
-    private List<DataRecord> mockDataRecords() {
-        return Arrays.asList(
+        List<DataRecord> dataRecords = Arrays.asList(
                 mockInsertDataRecord("t1", 1, 1, 1),
                 mockUpdateDataRecord("t1", 1, 2, 1),
                 mockUpdateDataRecord("t1", 1, 2, 2),
                 mockUpdateDataRecord("t1", 2, 1, 1),
                 mockUpdateDataRecord("t1", 2, 2, 1),
                 mockUpdateDataRecord("t1", 2, 2, 2),
+                mockInsertDataRecord("t1", 10, 10, 10),
                 mockDeleteDataRecord("t1", 3, 1, 1),
                 mockInsertDataRecord("t2", 1, 1, 1));
+        List<GroupedDataRecord> groupedDataRecords = 
dataRecordMerger.group(dataRecords);
+        assertThat(groupedDataRecords.size(), is(2));
+        assertThat(groupedDataRecords.get(0).getTableName(), is("t1"));
+        
assertThat(groupedDataRecords.get(0).getBatchInsertDataRecords().size(), is(1));
+        
assertThat(groupedDataRecords.get(0).getBatchUpdateDataRecords().size(), is(0));
+        
assertThat(groupedDataRecords.get(0).getBatchDeleteDataRecords().size(), is(1));
+        assertThat(groupedDataRecords.get(0).getNonBatchRecords().size(), 
is(6));
+        assertThat(groupedDataRecords.get(1).getTableName(), is("t2"));
+        
assertThat(groupedDataRecords.get(1).getBatchInsertDataRecords().size(), is(1));
     }
     
     private DataRecord mockInsertDataRecord(final int id, final int userId, 
final int totalPrice) {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index 7298b116ef0..19b9e2b09e9 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -101,20 +101,22 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
         int randomInt = random.nextInt(-100, 100);
         if (databaseType instanceof MySQLDatabaseType) {
             String sql = 
SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES), 
orderTableName, "?");
-            log.info("update sql: {}", sql);
             int randomUnsignedInt = random.nextInt(10, 100);
             LocalDateTime now = LocalDateTime.now();
-            DataSourceExecuteUtils.execute(dataSource, sql, new 
Object[]{"中文测试", randomInt, randomInt, randomInt, randomUnsignedInt, 
randomUnsignedInt, randomUnsignedInt,
+            Object[] parameters = {"中文测试", randomInt, randomInt, randomInt, 
randomUnsignedInt, randomUnsignedInt, randomUnsignedInt,
                     randomUnsignedInt, 1.0F, 1.0, new BigDecimal("999"), now, 
now, now.toLocalDate(), now.toLocalTime(), Year.now().getValue() + 1, new 
byte[]{}, new byte[]{1, 2, -1, -3},
-                    "D".getBytes(), "A".getBytes(), "T".getBytes(), "E", 
"text", "mediumText", "3", "3", PipelineCaseHelper.generateJsonString(32, 
true), orderId});
+                    "D".getBytes(), "A".getBytes(), "T".getBytes(), "E", 
"text", "mediumText", "3", "3", PipelineCaseHelper.generateJsonString(32, 
true), orderId};
+            log.info("update sql: {}, params: {}", sql, parameters);
+            DataSourceExecuteUtils.execute(dataSource, sql, parameters);
             return;
         }
         if (databaseType instanceof SchemaSupportedDatabaseType) {
             String sql = 
SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(POSTGRESQL_COLUMN_NAMES), 
orderTableName, "?");
-            log.info("update sql: {}", sql);
-            DataSourceExecuteUtils.execute(dataSource, sql, new 
Object[]{"中文测试", randomInt, BigDecimal.valueOf(10000), true, new byte[]{}, 
"char", "varchar", PipelineCaseHelper.generateFloat(),
+            Object[] parameters = {"中文测试", randomInt, 
BigDecimal.valueOf(10000), true, new byte[]{}, "char", "varchar", 
PipelineCaseHelper.generateFloat(),
                     PipelineCaseHelper.generateDouble(), 
PipelineCaseHelper.generateJsonString(10, true), 
PipelineCaseHelper.generateJsonString(20, true), "text-update", LocalDate.now(),
-                    LocalTime.now(), Timestamp.valueOf(LocalDateTime.now()), 
OffsetDateTime.now(), orderId});
+                    LocalTime.now(), Timestamp.valueOf(LocalDateTime.now()), 
OffsetDateTime.now(), orderId};
+            log.info("update sql: {}, params: {}", sql, parameters);
+            DataSourceExecuteUtils.execute(dataSource, sql, parameters);
         }
     }
     
@@ -124,6 +126,7 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
     
     private void deleteOrderById(final Object orderId) {
         String sql = SQLBuilderUtils.buildDeleteSQL(orderTableName, 
"order_id");
+        log.info("delete sql: {}, params: {}", sql, orderId);
         DataSourceExecuteUtils.execute(dataSource, sql, new Object[]{orderId});
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
index d5466ba5ee3..dc0140be0d5 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
@@ -106,6 +106,7 @@ class DataSourceImporterTest {
         DataRecord deleteRecord = getDataRecord("DELETE");
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), anyInt(), 
any())).thenReturn(mockRecords(deleteRecord));
+        when(preparedStatement.executeBatch()).thenReturn(new int[]{1});
         jdbcImporter.run();
         verify(preparedStatement).setObject(1, 1);
         verify(preparedStatement).setObject(2, 10);


Reply via email to