This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a657be1f5e1 Recover DataRecordGroupEngine (#29776)
a657be1f5e1 is described below

commit a657be1f5e1fc9995d7637bc73b533e8330d360d
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 19 17:43:47 2024 +0800

    Recover DataRecordGroupEngine (#29776)
    
    * Recover DataRecordGroupEngine
    
    * Update unit test and E2E test
---
 .../user-manual/error-code/sql-error-code.cn.md    |   1 +
 .../user-manual/error-code/sql-error-code.en.md    |   1 +
 ...ipelineUnexpectedDataRecordOrderException.java} |  27 ++--
 .../importer/sink/type/PipelineDataSourceSink.java |  21 +--
 .../pipeline/core/ingest/record/DataRecord.java    |  11 +-
 .../ingest/record/group/DataRecordGroupEngine.java | 145 ++++++++++++++----
 .../ingest/record/group/GroupedDataRecord.java     |   8 +-
 .../core/ingest/record/DataRecordTest.java         |   6 +-
 .../record/group/DataRecordGroupEngineTest.java    | 168 ++++++++++++++-------
 9 files changed, 261 insertions(+), 127 deletions(-)

diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md 
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index 6f22a1ec204..2d8149b54d9 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -118,6 +118,7 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
 | HY000     | 18020       | Failed to get DDL for table \`%s\`.                
                                |
 | 42S01     | 18030       | Duplicate storage unit names \`%s\`.               
                                |
 | 42S02     | 18031       | Storage units names \`%s\` do not exist.           
                                |
+| HY000     | 18050       | Before data record is \`%s\`, after data record is 
\`%s\`.                         |
 | 08000     | 18051       | Data check table \`%s\` failed.                    
                                |
 | 0A000     | 18052       | Unsupported pipeline database type \`%s\`.         
                                |
 | 0A000     | 18053       | Unsupported CRC32 data consistency calculate 
algorithm with database type \`%s\`.  |
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md 
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 445b6842ac2..ee3f90f4c43 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -118,6 +118,7 @@ SQL error codes provide by standard `SQL State`, `Vendor 
Code` and `Reason`, whi
 | HY000     | 18020       | Failed to get DDL for table \`%s\`.                
                                |
 | 42S01     | 18030       | Duplicate storage unit names \`%s\`.               
                                |
 | 42S02     | 18031       | Storage units names \`%s\` do not exist.           
                                |
+| HY000     | 18050       | Before data record is \`%s\`, after data record is 
\`%s\`.                         |
 | 08000     | 18051       | Data check table \`%s\` failed.                    
                                |
 | 0A000     | 18052       | Unsupported pipeline database type \`%s\`.         
                                |
 | 0A000     | 18053       | Unsupported CRC32 data consistency calculate 
algorithm with database type \`%s\`.  |
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineUnexpectedDataRecordOrderException.java
similarity index 53%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineUnexpectedDataRecordOrderException.java
index ef54b3c7743..e285cbf1166 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineUnexpectedDataRecordOrderException.java
@@ -15,25 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
+package org.apache.shardingsphere.data.pipeline.core.exception.data;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
+import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 
-import java.util.Collection;
-
-@RequiredArgsConstructor
-@Getter
-public final class GroupedDataRecord {
-    
-    private final String tableName;
-    
-    private final Collection<DataRecord> batchInsertDataRecords;
-    
-    private final Collection<DataRecord> batchUpdateDataRecords;
+/**
+ * Pipeline unexpected data record order exception.
+ */
+public final class PipelineUnexpectedDataRecordOrderException extends 
PipelineSQLException {
     
-    private final Collection<DataRecord> batchDeleteDataRecords;
+    private static final long serialVersionUID = 6023695604738387750L;
     
-    private final Collection<DataRecord> nonBatchRecords;
+    public PipelineUnexpectedDataRecordOrderException(final DataRecord 
beforeDataRecord, final DataRecord afterDataRecord) {
+        super(XOpenSQLState.GENERAL_ERROR, 50, String.format("Before data 
record is `%s`, after data record is `%s`.", beforeDataRecord, 
afterDataRecord));
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index f75363e4180..dcbe4dcaf3a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -39,7 +39,6 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -72,15 +71,14 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     @Override
     public PipelineJobProgressUpdatedParameter write(final String ackId, final 
Collection<Record> records) {
-        Collection<DataRecord> dataRecords = 
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
+        List<DataRecord> dataRecords = 
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
         if (dataRecords.isEmpty()) {
             return new PipelineJobProgressUpdatedParameter(0);
         }
         for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
-            batchWrite(each.getBatchDeleteDataRecords());
-            batchWrite(each.getBatchInsertDataRecords());
-            batchWrite(each.getBatchUpdateDataRecords());
-            sequentialWrite(each.getNonBatchRecords());
+            batchWrite(each.getDeleteDataRecords());
+            batchWrite(each.getInsertDataRecords());
+            batchWrite(each.getUpdateDataRecords());
         }
         return new PipelineJobProgressUpdatedParameter((int) 
dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == 
each.getType()).count());
     }
@@ -105,17 +103,6 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         }
     }
     
-    private void sequentialWrite(final Collection<DataRecord> records) {
-        // TODO it's better use transaction, but execute delete maybe not 
effect when open transaction of PostgreSQL sometimes
-        try {
-            for (DataRecord each : records) {
-                doWrite(Collections.singleton(each));
-            }
-        } catch (final SQLException ex) {
-            throw new PipelineImporterJobWriteException(ex);
-        }
-    }
-    
     private void doWrite(final Collection<DataRecord> records) throws 
SQLException {
         try (Connection connection = dataSource.getConnection()) {
             boolean enableTransaction = records.size() > 1;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
index bf2abf59515..9e047d3e3f9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
@@ -105,7 +105,16 @@ public final class DataRecord extends Record {
      * @return key
      */
     public Key getKey() {
-        return PipelineSQLOperationType.DELETE == type ? new Key(tableName, 
oldUniqueKeyValues) : new Key(tableName, uniqueKeyValue);
+        return new Key(tableName, uniqueKeyValue);
+    }
+    
+    /**
+     * Get old key.
+     *
+     * @return key
+     */
+    public Key getOldKey() {
+        return new Key(tableName, oldUniqueKeyValues);
     }
     
     @RequiredArgsConstructor
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index 0e9a27eac5e..75317b91bc6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -18,18 +18,18 @@
 package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
 
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord.Key;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
 
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.EnumMap;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.stream.Collectors;
 
 /**
@@ -38,41 +38,124 @@ import java.util.stream.Collectors;
 public final class DataRecordGroupEngine {
     
     /**
-     * Group by table and operation type.
+     * Merge data record.
+     * <pre>
+     * insert + insert -&gt; exception
+     * update + insert -&gt; exception
+     * delete + insert -&gt; insert
+     * insert + update -&gt; insert
+     * update + update -&gt; update
+     * delete + update -&gt; exception
+     * insert + delete -&gt; delete
+     * update + delete -&gt; delete
+     * delete + delete -&gt; exception
+     * </pre>
      *
-     * @param records data records
+     * @param dataRecords data records
+     * @return merged data records
+     */
+    public List<DataRecord> merge(final List<DataRecord> dataRecords) {
+        Map<DataRecord.Key, DataRecord> result = new HashMap<>();
+        dataRecords.forEach(each -> {
+            if (PipelineSQLOperationType.INSERT.equals(each.getType())) {
+                mergeInsert(each, result);
+            } else if (PipelineSQLOperationType.UPDATE.equals(each.getType())) 
{
+                mergeUpdate(each, result);
+            } else if (PipelineSQLOperationType.DELETE.equals(each.getType())) 
{
+                mergeDelete(each, result);
+            }
+        });
+        return new ArrayList<>(result.values());
+    }
+    
+    /**
+     * Group by table and type.
+     *
+     * @param dataRecords data records
      * @return grouped data records
      */
-    public Collection<GroupedDataRecord> group(final Collection<DataRecord> 
records) {
-        Map<Key, Boolean> duplicateKeys = getDuplicateKeys(records);
-        Collection<String> tableNames = new LinkedHashSet<>();
-        Map<String, List<DataRecord>> nonBatchRecords = new LinkedHashMap<>();
-        Map<String, Map<PipelineSQLOperationType, Collection<DataRecord>>> 
batchDataRecords = new LinkedHashMap<>();
-        for (DataRecord each : records) {
-            tableNames.add(each.getTableName());
-            if (duplicateKeys.getOrDefault(each.getKey(), false)) {
-                nonBatchRecords.computeIfAbsent(each.getTableName(), ignored 
-> new LinkedList<>()).add(each);
-            } else {
-                batchDataRecords.computeIfAbsent(
-                        each.getTableName(), ignored -> new 
EnumMap<>(PipelineSQLOperationType.class)).computeIfAbsent(each.getType(), 
ignored -> new LinkedList<>()).add(each);
+    public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
+        List<GroupedDataRecord> result = new ArrayList<>(100);
+        List<DataRecord> mergedDataRecords = 
dataRecords.get(0).getUniqueKeyValue().isEmpty() ? dataRecords : 
merge(dataRecords);
+        Map<String, List<DataRecord>> tableGroup = 
mergedDataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
+        for (Entry<String, List<DataRecord>> entry : tableGroup.entrySet()) {
+            Map<PipelineSQLOperationType, List<DataRecord>> typeGroup = 
entry.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
+            result.add(new GroupedDataRecord(entry.getKey(), 
typeGroup.getOrDefault(PipelineSQLOperationType.INSERT, 
Collections.emptyList()),
+                    typeGroup.getOrDefault(PipelineSQLOperationType.UPDATE, 
Collections.emptyList()), 
typeGroup.getOrDefault(PipelineSQLOperationType.DELETE, 
Collections.emptyList())));
+        }
+        return result;
+    }
+    
+    private void mergeInsert(final DataRecord dataRecord, final 
Map<DataRecord.Key, DataRecord> dataRecords) {
+        DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
+        ShardingSpherePreconditions.checkState(null == beforeDataRecord
+                || 
PipelineSQLOperationType.DELETE.equals(beforeDataRecord.getType()), () -> new 
PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
+        dataRecords.put(dataRecord.getKey(), dataRecord);
+    }
+    
+    private void mergeUpdate(final DataRecord dataRecord, final 
Map<DataRecord.Key, DataRecord> dataRecords) {
+        DataRecord beforeDataRecord = checkUpdatedUniqueKey(dataRecord) ? 
dataRecords.get(dataRecord.getOldKey()) : dataRecords.get(dataRecord.getKey());
+        if (null == beforeDataRecord) {
+            dataRecords.put(dataRecord.getKey(), dataRecord);
+            return;
+        }
+        
ShardingSpherePreconditions.checkState(!PipelineSQLOperationType.DELETE.equals(beforeDataRecord.getType()),
 () -> new UnsupportedSQLOperationException("Not Delete"));
+        if (checkUpdatedUniqueKey(dataRecord)) {
+            dataRecords.remove(dataRecord.getOldKey());
+        }
+        if 
(PipelineSQLOperationType.INSERT.equals(beforeDataRecord.getType())) {
+            DataRecord mergedDataRecord = 
mergeColumn(PipelineSQLOperationType.INSERT, dataRecord.getTableName(), 
beforeDataRecord, dataRecord);
+            dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+            return;
+        }
+        if 
(PipelineSQLOperationType.UPDATE.equals(beforeDataRecord.getType())) {
+            DataRecord mergedDataRecord = 
mergeColumn(PipelineSQLOperationType.UPDATE, dataRecord.getTableName(), 
beforeDataRecord, dataRecord);
+            dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+        }
+    }
+    
+    private void mergeDelete(final DataRecord dataRecord, final 
Map<DataRecord.Key, DataRecord> dataRecords) {
+        DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
+        ShardingSpherePreconditions.checkState(null == beforeDataRecord
+                || 
!PipelineSQLOperationType.DELETE.equals(beforeDataRecord.getType()), () -> new 
PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
+        if (null != beforeDataRecord && 
PipelineSQLOperationType.UPDATE.equals(beforeDataRecord.getType()) && 
checkUpdatedUniqueKey(beforeDataRecord)) {
+            DataRecord mergedDataRecord = new 
DataRecord(PipelineSQLOperationType.DELETE, dataRecord.getTableName(), 
dataRecord.getPosition(), dataRecord.getColumnCount());
+            for (int i = 0; i < dataRecord.getColumnCount(); i++) {
+                mergedDataRecord.addColumn(new 
Column(dataRecord.getColumn(i).getName(),
+                        dataRecord.getColumn(i).isUniqueKey() ? 
beforeDataRecord.getColumn(i).getOldValue() : 
beforeDataRecord.getColumn(i).getValue(), true, 
dataRecord.getColumn(i).isUniqueKey()));
+            }
+            dataRecords.remove(beforeDataRecord.getKey());
+            dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+        } else {
+            dataRecords.put(dataRecord.getOldKey(), dataRecord);
+        }
+    }
+    
+    private boolean checkUpdatedUniqueKey(final DataRecord dataRecord) {
+        for (Column each : dataRecord.getColumns()) {
+            if (each.isUniqueKey() && each.isUpdated()) {
+                return true;
             }
         }
-        return tableNames.stream().map(each -> getGroupedDataRecord(
-                each, batchDataRecords.getOrDefault(each, 
Collections.emptyMap()), nonBatchRecords.getOrDefault(each, 
Collections.emptyList()))).collect(Collectors.toList());
+        return false;
     }
     
-    private Map<Key, Boolean> getDuplicateKeys(final Collection<DataRecord> 
records) {
-        Map<Key, Boolean> result = new HashMap<>();
-        for (DataRecord each : records) {
-            Key key = each.getKey();
-            result.put(key, result.containsKey(key));
+    private DataRecord mergeColumn(final PipelineSQLOperationType type, final 
String tableName, final DataRecord preDataRecord, final DataRecord 
curDataRecord) {
+        DataRecord result = new DataRecord(type, tableName, 
curDataRecord.getPosition(), curDataRecord.getColumnCount());
+        for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
+            result.addColumn(new Column(
+                    curDataRecord.getColumn(i).getName(),
+                    preDataRecord.getColumn(i).isUniqueKey()
+                            ? 
mergePrimaryKeyOldValue(preDataRecord.getColumn(i), curDataRecord.getColumn(i))
+                            : null,
+                    curDataRecord.getColumn(i).getValue(),
+                    preDataRecord.getColumn(i).isUpdated() || 
curDataRecord.getColumn(i).isUpdated(),
+                    curDataRecord.getColumn(i).isUniqueKey()));
         }
         return result;
     }
     
-    private GroupedDataRecord getGroupedDataRecord(final String tableName, 
final Map<PipelineSQLOperationType, Collection<DataRecord>> batchRecords, final 
Collection<DataRecord> nonBatchRecords) {
-        return new GroupedDataRecord(tableName, 
batchRecords.getOrDefault(PipelineSQLOperationType.INSERT, 
Collections.emptyList()),
-                batchRecords.getOrDefault(PipelineSQLOperationType.UPDATE, 
Collections.emptyList()), 
batchRecords.getOrDefault(PipelineSQLOperationType.DELETE, 
Collections.emptyList()),
-                nonBatchRecords);
+    private Object mergePrimaryKeyOldValue(final Column beforeColumn, final 
Column column) {
+        return beforeColumn.isUpdated() ? beforeColumn.getOldValue() : 
(column.isUpdated() ? column.getOldValue() : null);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
index ef54b3c7743..45d58c5381f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
@@ -29,11 +29,9 @@ public final class GroupedDataRecord {
     
     private final String tableName;
     
-    private final Collection<DataRecord> batchInsertDataRecords;
+    private final Collection<DataRecord> insertDataRecords;
     
-    private final Collection<DataRecord> batchUpdateDataRecords;
+    private final Collection<DataRecord> updateDataRecords;
     
-    private final Collection<DataRecord> batchDeleteDataRecords;
-    
-    private final Collection<DataRecord> nonBatchRecords;
+    private final Collection<DataRecord> deleteDataRecords;
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
index f10f4b10d13..c991138416c 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
@@ -27,7 +27,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 class DataRecordTest {
     
     @Test
-    void assertGetKeyWithUpdate() {
+    void assertGetKey() {
         DataRecord beforeDataRecord = new 
DataRecord(PipelineSQLOperationType.UPDATE, "foo_tbl", new 
IngestPlaceholderPosition(), 1);
         beforeDataRecord.addColumn(new Column("id", 1, true, true));
         DataRecord afterDataRecord = new 
DataRecord(PipelineSQLOperationType.UPDATE, "foo_tbl", new 
IngestPlaceholderPosition(), 1);
@@ -36,11 +36,11 @@ class DataRecordTest {
     }
     
     @Test
-    void assertGetKeyWithDelete() {
+    void assertGetOldKey() {
         DataRecord beforeDataRecord = new 
DataRecord(PipelineSQLOperationType.DELETE, "foo_tbl", new 
IngestPlaceholderPosition(), 1);
         beforeDataRecord.addColumn(new Column("id", 1, 2, true, true));
         DataRecord afterDataRecord = new 
DataRecord(PipelineSQLOperationType.DELETE, "foo_tbl", new 
IngestPlaceholderPosition(), 1);
         afterDataRecord.addColumn(new Column("id", 1, 3, true, true));
-        assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getKey()));
+        assertThat(beforeDataRecord.getOldKey(), 
is(afterDataRecord.getOldKey()));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
index 64a6b84769b..5a3451f4ad8 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
@@ -18,149 +18,209 @@
 package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
 
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.sameInstance;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 class DataRecordGroupEngineTest {
     
+    private final DataRecordGroupEngine groupEngine = new 
DataRecordGroupEngine();
+    
+    @Test
+    void assertInsertBeforeInsert() {
+        DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockInsertDataRecord(1, 1, 1);
+        assertThrows(PipelineUnexpectedDataRecordOrderException.class, () -> 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
+    }
+    
+    @Test
+    void assertUpdateBeforeInsert() {
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 2);
+        DataRecord afterDataRecord = mockInsertDataRecord(1, 1, 1);
+        assertThrows(PipelineUnexpectedDataRecordOrderException.class, () -> 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
+    }
+    
     @Test
     void assertDeleteBeforeInsert() {
         DataRecord beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
         DataRecord afterDataRecord = mockInsertDataRecord(1, 1, 1);
-        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
     }
     
     @Test
     void assertInsertBeforeUpdate() {
         DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
         DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
-        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(PipelineSQLOperationType.INSERT));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertNull(dataRecord.getColumn(0).getOldValue());
+        assertThat(dataRecord.getColumn(0).getValue(), is(1));
+        assertThat(dataRecord.getColumn(1).getValue(), is(2));
+        assertThat(dataRecord.getColumn(2).getValue(), is(2));
     }
     
     @Test
     void assertInsertBeforeUpdatePrimaryKey() {
         DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
-        DataRecord afterDataRecord = mockUpdateDataRecord(1, 1, 2, 2);
-        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
+        Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(PipelineSQLOperationType.INSERT));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertNull(dataRecord.getColumn(0).getOldValue());
+        assertThat(dataRecord.getColumn(0).getValue(), is(2));
+        assertThat(dataRecord.getColumn(1).getValue(), is(2));
+        assertThat(dataRecord.getColumn(2).getValue(), is(2));
     }
     
     @Test
     void assertUpdateBeforeUpdate() {
         DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
         DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
-        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertNull(dataRecord.getColumn(0).getOldValue());
+        assertThat(dataRecord.getColumn(0).getValue(), is(1));
+        assertThat(dataRecord.getColumn(1).getValue(), is(2));
+        assertThat(dataRecord.getColumn(2).getValue(), is(2));
     }
     
     @Test
     void assertUpdateBeforeUpdatePrimaryKey() {
         DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
         DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
-        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.size(), is(1));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
+        assertThat(dataRecord.getColumn(0).getValue(), is(2));
+        assertThat(dataRecord.getColumn(1).getValue(), is(2));
+        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+    }
+    
+    @Test
+    void assertUpdatePrimaryKeyBeforeUpdate() {
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+        DataRecord afterDataRecord = mockUpdateDataRecord(2, 2, 2);
+        Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        
assertDataRecordsMatched(actual.iterator().next().getBatchUpdateDataRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
+        assertThat(dataRecord.getColumn(0).getValue(), is(2));
+        assertThat(dataRecord.getColumn(1).getValue(), is(2));
+        assertThat(dataRecord.getColumn(2).getValue(), is(2));
     }
     
     @Test
     void assertUpdatePrimaryKeyBeforeUpdatePrimaryKey() {
         DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
         DataRecord afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
-        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        GroupedDataRecord actualGroupedDataRecord = actual.iterator().next();
-        assertThat(actualGroupedDataRecord.getBatchUpdateDataRecords().size(), 
is(2));
-        Iterator<DataRecord> batchUpdateDataRecords = 
actualGroupedDataRecord.getBatchUpdateDataRecords().iterator();
-        DataRecord actualDataRecord1 = batchUpdateDataRecords.next();
-        assertThat(actualDataRecord1.getType(), 
is(PipelineSQLOperationType.UPDATE));
-        assertThat(actualDataRecord1.getTableName(), is("order"));
-        assertThat(actualDataRecord1.getColumn(0).getOldValue(), is(1));
-        assertThat(actualDataRecord1.getColumn(0).getValue(), is(2));
-        assertThat(actualDataRecord1.getColumn(1).getValue(), is(1));
-        assertThat(actualDataRecord1.getColumn(2).getValue(), is(1));
-        DataRecord actualDataRecord2 = batchUpdateDataRecords.next();
-        assertThat(actualDataRecord2.getType(), 
is(PipelineSQLOperationType.UPDATE));
-        assertThat(actualDataRecord2.getTableName(), is("order"));
-        assertThat(actualDataRecord2.getColumn(0).getOldValue(), is(2));
-        assertThat(actualDataRecord2.getColumn(0).getValue(), is(3));
-        assertThat(actualDataRecord2.getColumn(1).getValue(), is(2));
-        assertThat(actualDataRecord2.getColumn(2).getValue(), is(2));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
+        assertThat(dataRecord.getColumn(0).getValue(), is(3));
+        assertThat(dataRecord.getColumn(1).getValue(), is(2));
+        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+    }
+    
+    @Test
+    void assertDeleteBeforeUpdate() {
+        DataRecord beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+        assertThrows(UnsupportedSQLOperationException.class, () -> 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
     }
     
     @Test
     void assertInsertBeforeDelete() {
         DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
         DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
-        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
     }
     
     @Test
     void assertUpdateBeforeDelete() {
         DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
         DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
-        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
     }
     
     @Test
     void assertUpdatePrimaryKeyBeforeDelete() {
         DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
         DataRecord afterDataRecord = mockDeleteDataRecord(2, 1, 1);
-        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(PipelineSQLOperationType.DELETE));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertNull(dataRecord.getColumn(0).getOldValue());
+        assertThat(dataRecord.getColumn(0).getValue(), is(1));
+        assertThat(dataRecord.getColumn(1).getValue(), is(1));
+        assertThat(dataRecord.getColumn(2).getValue(), is(1));
     }
     
-    private void assertDataRecordsMatched(final Collection<DataRecord> 
actualRecords, final List<DataRecord> expectedRecords) {
-        for (int i = 0; i < actualRecords.size(); i++) {
-            assertThat(actualRecords.iterator().next(), 
sameInstance(expectedRecords.get(0)));
-        }
+    @Test
+    void assertDeleteBeforeDelete() {
+        DataRecord beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+        assertThrows(PipelineUnexpectedDataRecordOrderException.class, () -> 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
     }
     
     @Test
     void assertGroup() {
-        List<DataRecord> dataRecords = Arrays.asList(
+        List<DataRecord> dataRecords = mockDataRecords();
+        List<GroupedDataRecord> groupedDataRecords = 
groupEngine.group(dataRecords);
+        assertThat(groupedDataRecords.size(), is(2));
+        assertThat(groupedDataRecords.get(0).getTableName(), is("t1"));
+        assertThat(groupedDataRecords.get(1).getTableName(), is("t2"));
+        assertThat(groupedDataRecords.get(0).getInsertDataRecords().size(), 
is(1));
+        assertThat(groupedDataRecords.get(0).getUpdateDataRecords().size(), 
is(1));
+        assertThat(groupedDataRecords.get(0).getDeleteDataRecords().size(), 
is(1));
+    }
+    
+    private List<DataRecord> mockDataRecords() {
+        return Arrays.asList(
                 mockInsertDataRecord("t1", 1, 1, 1),
                 mockUpdateDataRecord("t1", 1, 2, 1),
                 mockUpdateDataRecord("t1", 1, 2, 2),
                 mockUpdateDataRecord("t1", 2, 1, 1),
                 mockUpdateDataRecord("t1", 2, 2, 1),
                 mockUpdateDataRecord("t1", 2, 2, 2),
-                mockInsertDataRecord("t1", 10, 10, 10),
                 mockDeleteDataRecord("t1", 3, 1, 1),
                 mockInsertDataRecord("t2", 1, 1, 1));
-        Collection<GroupedDataRecord> groupedDataRecords = new 
DataRecordGroupEngine().group(dataRecords);
-        assertThat(groupedDataRecords.size(), is(2));
-        Iterator<GroupedDataRecord> groupedDataRecordsIterator = 
groupedDataRecords.iterator();
-        GroupedDataRecord actualGroupedDataRecord1 = 
groupedDataRecordsIterator.next();
-        assertThat(actualGroupedDataRecord1.getTableName(), is("t1"));
-        
assertThat(actualGroupedDataRecord1.getBatchInsertDataRecords().size(), is(1));
-        
assertThat(actualGroupedDataRecord1.getBatchUpdateDataRecords().size(), is(0));
-        
assertThat(actualGroupedDataRecord1.getBatchDeleteDataRecords().size(), is(1));
-        assertThat(actualGroupedDataRecord1.getNonBatchRecords().size(), 
is(6));
-        GroupedDataRecord actualGroupedDataRecord2 = 
groupedDataRecordsIterator.next();
-        assertThat(actualGroupedDataRecord2.getTableName(), is("t2"));
-        
assertThat(actualGroupedDataRecord2.getBatchInsertDataRecords().size(), is(1));
     }
     
     private DataRecord mockInsertDataRecord(final int id, final int userId, 
final int totalPrice) {


Reply via email to