This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 72c5a86b432 Fix DataRecordGroupEngine mergeUpdate and mergeDelete 
(#29853)
72c5a86b432 is described below

commit 72c5a86b432c49506aefc69adcedcd076a51486e
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Jan 25 17:34:08 2024 +0800

    Fix DataRecordGroupEngine mergeUpdate and mergeDelete (#29853)
    
    * Fix DataRecordGroupEngine mergeUpdate and mergeDelete
    
    * Improve DataRecordGroupEngineTest
    
    * Improve E2EIncrementalTask to generate more types of changes
---
 .../ingest/record/group/DataRecordGroupEngine.java |  29 ++----
 .../record/group/DataRecordGroupEngineTest.java    | 113 +++++++++++----------
 .../pipeline/cases/task/E2EIncrementalTask.java    |  44 ++++++--
 3 files changed, 104 insertions(+), 82 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index f58558f26a8..82624900af8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -94,22 +94,22 @@ public final class DataRecordGroupEngine {
     }
     
     private void mergeUpdate(final DataRecord dataRecord, final 
Map<DataRecord.Key, DataRecord> dataRecords) {
-        DataRecord beforeDataRecord = checkUpdatedUniqueKey(dataRecord) ? 
dataRecords.get(dataRecord.getOldKey()) : dataRecords.get(dataRecord.getKey());
+        DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
         if (null == beforeDataRecord) {
             dataRecords.put(dataRecord.getKey(), dataRecord);
             return;
         }
         ShardingSpherePreconditions.checkState(PipelineSQLOperationType.DELETE 
!= beforeDataRecord.getType(), () -> new UnsupportedSQLOperationException("Not 
Delete"));
-        if (checkUpdatedUniqueKey(dataRecord)) {
+        if (isUniqueKeyUpdated(dataRecord)) {
             dataRecords.remove(dataRecord.getOldKey());
         }
         if (PipelineSQLOperationType.INSERT == beforeDataRecord.getType()) {
-            DataRecord mergedDataRecord = 
mergeColumn(PipelineSQLOperationType.INSERT, dataRecord.getTableName(), 
beforeDataRecord, dataRecord);
+            DataRecord mergedDataRecord = 
mergeUpdateColumn(PipelineSQLOperationType.INSERT, dataRecord.getTableName(), 
beforeDataRecord, dataRecord);
             dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
             return;
         }
         if (PipelineSQLOperationType.UPDATE == beforeDataRecord.getType()) {
-            DataRecord mergedDataRecord = 
mergeColumn(PipelineSQLOperationType.UPDATE, dataRecord.getTableName(), 
beforeDataRecord, dataRecord);
+            DataRecord mergedDataRecord = 
mergeUpdateColumn(PipelineSQLOperationType.UPDATE, dataRecord.getTableName(), 
beforeDataRecord, dataRecord);
             dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
         }
     }
@@ -118,11 +118,12 @@ public final class DataRecordGroupEngine {
         DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
         ShardingSpherePreconditions.checkState(null == beforeDataRecord || 
PipelineSQLOperationType.DELETE != beforeDataRecord.getType(),
                 () -> new 
PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
-        if (null != beforeDataRecord && PipelineSQLOperationType.UPDATE == 
beforeDataRecord.getType() && checkUpdatedUniqueKey(beforeDataRecord)) {
+        if (null != beforeDataRecord && PipelineSQLOperationType.UPDATE == 
beforeDataRecord.getType() && isUniqueKeyUpdated(beforeDataRecord)) {
             DataRecord mergedDataRecord = new 
DataRecord(PipelineSQLOperationType.DELETE, dataRecord.getTableName(), 
dataRecord.getPosition(), dataRecord.getColumnCount());
             for (int i = 0; i < dataRecord.getColumnCount(); i++) {
                 mergedDataRecord.addColumn(new 
Column(dataRecord.getColumn(i).getName(),
-                        dataRecord.getColumn(i).isUniqueKey() ? 
beforeDataRecord.getColumn(i).getOldValue() : 
beforeDataRecord.getColumn(i).getValue(), true, 
dataRecord.getColumn(i).isUniqueKey()));
+                        dataRecord.getColumn(i).isUniqueKey() ? 
beforeDataRecord.getColumn(i).getOldValue() : 
beforeDataRecord.getColumn(i).getValue(),
+                        null, true, dataRecord.getColumn(i).isUniqueKey()));
             }
             dataRecords.remove(beforeDataRecord.getKey());
             dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
@@ -131,7 +132,8 @@ public final class DataRecordGroupEngine {
         }
     }
     
-    private boolean checkUpdatedUniqueKey(final DataRecord dataRecord) {
+    private boolean isUniqueKeyUpdated(final DataRecord dataRecord) {
+        // TODO Compatible with multiple unique indexes
         for (Column each : dataRecord.getColumns()) {
             if (each.isUniqueKey() && each.isUpdated()) {
                 return true;
@@ -140,25 +142,16 @@ public final class DataRecordGroupEngine {
         return false;
     }
     
-    private DataRecord mergeColumn(final PipelineSQLOperationType type, final 
String tableName, final DataRecord preDataRecord, final DataRecord 
curDataRecord) {
+    private DataRecord mergeUpdateColumn(final PipelineSQLOperationType type, 
final String tableName, final DataRecord preDataRecord, final DataRecord 
curDataRecord) {
         DataRecord result = new DataRecord(type, tableName, 
curDataRecord.getPosition(), curDataRecord.getColumnCount());
         for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
             result.addColumn(new Column(
                     curDataRecord.getColumn(i).getName(),
-                    preDataRecord.getColumn(i).isUniqueKey()
-                            ? 
mergeUniqueKeyOldValue(preDataRecord.getColumn(i), curDataRecord.getColumn(i))
-                            : null,
+                    preDataRecord.getColumn(i).getOldValue(),
                     curDataRecord.getColumn(i).getValue(),
                     preDataRecord.getColumn(i).isUpdated() || 
curDataRecord.getColumn(i).isUpdated(),
                     curDataRecord.getColumn(i).isUniqueKey()));
         }
         return result;
     }
-    
-    private Object mergeUniqueKeyOldValue(final Column beforeColumn, final 
Column column) {
-        if (beforeColumn.isUpdated()) {
-            return beforeColumn.getOldValue();
-        }
-        return column.isUpdated() ? column.getOldValue() : null;
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
index 5a3451f4ad8..504143998c0 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
@@ -28,11 +28,11 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.sameInstance;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 class DataRecordGroupEngineTest {
@@ -55,8 +55,8 @@ class DataRecordGroupEngineTest {
     
     @Test
     void assertDeleteBeforeInsert() {
-        DataRecord beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
-        DataRecord afterDataRecord = mockInsertDataRecord(1, 1, 1);
+        DataRecord beforeDataRecord = mockDeleteDataRecord(1, 10, 50);
+        DataRecord afterDataRecord = mockInsertDataRecord(1, 10, 100);
         Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
@@ -64,105 +64,107 @@ class DataRecordGroupEngineTest {
     
     @Test
     void assertInsertBeforeUpdate() {
-        DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
-        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+        DataRecord beforeDataRecord = mockInsertDataRecord(1, 10, 50);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 10, 200);
         Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.INSERT));
         assertThat(dataRecord.getTableName(), is("order"));
-        assertNull(dataRecord.getColumn(0).getOldValue());
-        assertThat(dataRecord.getColumn(0).getValue(), is(1));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", null, 
1, true, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
null, 10, true, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", null, 200, true, false));
+    }
+    
+    private void assertColumnsMatched(final Column actual, final Column 
expected) {
+        assertThat(actual.getName(), is(expected.getName()));
+        assertThat(actual.getOldValue(), is(expected.getOldValue()));
+        assertThat(actual.getValue(), is(expected.getValue()));
+        assertThat(actual.isUpdated(), is(expected.isUpdated()));
+        assertThat(actual.isUniqueKey(), is(expected.isUniqueKey()));
     }
     
     @Test
     void assertInsertBeforeUpdatePrimaryKey() {
-        DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
-        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
+        DataRecord beforeDataRecord = mockInsertDataRecord(1, 10, 50);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 10, 50);
         Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.INSERT));
         assertThat(dataRecord.getTableName(), is("order"));
-        assertNull(dataRecord.getColumn(0).getOldValue());
-        assertThat(dataRecord.getColumn(0).getValue(), is(2));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", null, 
2, true, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
null, 10, true, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", null, 50, true, false));
     }
     
     @Test
     void assertUpdateBeforeUpdate() {
-        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
-        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 10, 100);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 1, 10, 200);
         Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
         assertThat(dataRecord.getTableName(), is("order"));
-        assertNull(dataRecord.getColumn(0).getOldValue());
-        assertThat(dataRecord.getColumn(0).getValue(), is(1));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 1, 
false, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, 10, false, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, 200, true, false));
     }
     
     @Test
     void assertUpdateBeforeUpdatePrimaryKey() {
-        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
-        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 10, 50);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 10, 200);
         Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
         assertThat(dataRecord.getTableName(), is("order"));
-        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
-        assertThat(dataRecord.getColumn(0).getValue(), is(2));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 2, 
true, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, 10, false, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, 200, true, false));
     }
     
     @Test
     void assertUpdatePrimaryKeyBeforeUpdate() {
-        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
-        DataRecord afterDataRecord = mockUpdateDataRecord(2, 2, 2);
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 10, 50);
+        DataRecord afterDataRecord = mockUpdateDataRecord(2, 10, 200);
         Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
         assertThat(dataRecord.getTableName(), is("order"));
-        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
-        assertThat(dataRecord.getColumn(0).getValue(), is(2));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 2, 
true, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, 10, false, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, 200, true, false));
     }
     
     @Test
     void assertUpdatePrimaryKeyBeforeUpdatePrimaryKey() {
-        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
-        DataRecord afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 10, 50);
+        DataRecord afterDataRecord = mockUpdateDataRecord(2, 3, 10, 50);
         Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
         assertThat(dataRecord.getTableName(), is("order"));
-        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
-        assertThat(dataRecord.getColumn(0).getValue(), is(3));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 3, 
true, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, 10, false, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, 50, false, false));
     }
     
     @Test
     void assertDeleteBeforeUpdate() {
-        DataRecord beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
-        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+        DataRecord beforeDataRecord = mockDeleteDataRecord(1, 10, 50);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 20, 200);
         assertThrows(UnsupportedSQLOperationException.class, () -> 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
     }
     
     @Test
     void assertInsertBeforeDelete() {
-        DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
-        DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+        DataRecord beforeDataRecord = mockInsertDataRecord(1, 10, 50);
+        DataRecord afterDataRecord = mockDeleteDataRecord(1, 10, 50);
         Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
@@ -170,8 +172,8 @@ class DataRecordGroupEngineTest {
     
     @Test
     void assertUpdateBeforeDelete() {
-        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
-        DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 10, 50);
+        DataRecord afterDataRecord = mockDeleteDataRecord(1, 10, 50);
         Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
@@ -179,17 +181,16 @@ class DataRecordGroupEngineTest {
     
     @Test
     void assertUpdatePrimaryKeyBeforeDelete() {
-        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
-        DataRecord afterDataRecord = mockDeleteDataRecord(2, 1, 1);
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 10, 50);
+        DataRecord afterDataRecord = mockDeleteDataRecord(2, 10, 50);
         Collection<DataRecord> actual = 
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         DataRecord dataRecord = actual.iterator().next();
         assertThat(dataRecord.getType(), is(PipelineSQLOperationType.DELETE));
         assertThat(dataRecord.getTableName(), is("order"));
-        assertNull(dataRecord.getColumn(0).getOldValue());
-        assertThat(dataRecord.getColumn(0).getValue(), is(1));
-        assertThat(dataRecord.getColumn(1).getValue(), is(1));
-        assertThat(dataRecord.getColumn(2).getValue(), is(1));
+        assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 
null, true, true));
+        assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id", 
10, null, true, false));
+        assertColumnsMatched(dataRecord.getColumn(2), new 
Column("total_price", 50, null, true, false));
     }
     
     @Test
@@ -236,7 +237,7 @@ class DataRecordGroupEngineTest {
     }
     
     private DataRecord mockUpdateDataRecord(final int id, final int userId, 
final int totalPrice) {
-        return mockUpdateDataRecord("order", null, id, userId, totalPrice);
+        return mockUpdateDataRecord("order", id, id, userId, totalPrice);
     }
     
     private DataRecord mockUpdateDataRecord(final Integer oldId, final int id, 
final int userId, final int totalPrice) {
@@ -244,14 +245,14 @@ class DataRecordGroupEngineTest {
     }
     
     private DataRecord mockUpdateDataRecord(final String tableName, final int 
id, final int userId, final int totalPrice) {
-        return mockUpdateDataRecord(tableName, null, id, userId, totalPrice);
+        return mockUpdateDataRecord(tableName, id, id, userId, totalPrice);
     }
     
     private DataRecord mockUpdateDataRecord(final String tableName, final 
Integer oldId, final int id, final int userId, final int totalPrice) {
         DataRecord result = new DataRecord(PipelineSQLOperationType.UPDATE, 
tableName, new IngestPlaceholderPosition(), 3);
-        result.addColumn(new Column("id", oldId, id, null != oldId, true));
-        result.addColumn(new Column("user_id", userId, true, false));
-        result.addColumn(new Column("total_price", totalPrice, true, false));
+        result.addColumn(new Column("id", oldId, id, 
!Objects.deepEquals(oldId, id), true));
+        result.addColumn(new Column("user_id", userId, userId, false, false));
+        result.addColumn(new Column("total_price", 50, totalPrice, 50 != 
totalPrice, false));
         return result;
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index 07108123858..482bf4f3095 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -40,6 +40,8 @@ import java.time.OffsetDateTime;
 import java.time.Year;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
@@ -79,16 +81,20 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
             primaryKeys.add(each[0]);
             insertOrder(each);
         }
-        ThreadLocalRandom random = ThreadLocalRandom.current();
-        for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
-            // TODO 0000-00-00 00:00:00 now will cause consistency check 
failed of MySQL.
-            // DataSourceUtil.execute(dataSource, String.format("UPDATE %s SET 
t_datetime='0000-00-00 00:00:00' WHERE order_id = ?", orderTableName)
-            updateOrderById(primaryKeys.get(random.nextInt(0, 
primaryKeys.size())));
+        Iterator<Object> primaryKeysIterator = primaryKeys.iterator();
+        Iterator<List<IncrementalAction>> incrementalActionsIterator = 
Arrays.asList(
+                Arrays.asList(IncrementalAction.PLAIN_UPDATE, 
IncrementalAction.UPDATE_NULL), Arrays.asList(IncrementalAction.UPDATE_NULL, 
IncrementalAction.PLAIN_UPDATE),
+                Arrays.asList(IncrementalAction.PLAIN_UPDATE, 
IncrementalAction.DELETE), Arrays.asList(IncrementalAction.UPDATE_NULL, 
IncrementalAction.DELETE),
+                Collections.singletonList(IncrementalAction.PLAIN_UPDATE), 
Collections.singletonList(IncrementalAction.UPDATE_NULL), 
Collections.singletonList(IncrementalAction.DELETE)).iterator();
+        while (primaryKeysIterator.hasNext() && 
incrementalActionsIterator.hasNext()) {
+            doIncrementalChanges(primaryKeysIterator.next(), 
incrementalActionsIterator.next());
         }
-        for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
-            setNullToAllFields(primaryKeys.get(random.nextInt(0, 
primaryKeys.size())));
-            deleteOrderById(primaryKeys.remove(random.nextInt(0, 
primaryKeys.size())));
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        while (primaryKeysIterator.hasNext()) {
+            doIncrementalChanges(primaryKeysIterator.next(), 
Collections.singletonList(IncrementalAction.values()[random.nextInt(0, 
IncrementalAction.values().length)]));
         }
+        // TODO 0000-00-00 00:00:00 now will cause consistency check failed of 
MySQL.
+        // DataSourceUtil.execute(dataSource, String.format("UPDATE %s SET 
t_datetime='0000-00-00 00:00:00' WHERE order_id = ?", orderTableName)
         log.info("increment task runnable execute successfully.");
     }
     
@@ -106,6 +112,24 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
         DataSourceExecuteUtils.execute(dataSource, sql, orderInsertData);
     }
     
+    private void doIncrementalChanges(final Object orderId, final 
List<IncrementalAction> actions) {
+        for (IncrementalAction each : actions) {
+            switch (each) {
+                case PLAIN_UPDATE:
+                    updateOrderById(orderId);
+                    break;
+                case UPDATE_NULL:
+                    setNullToAllFields(orderId);
+                    break;
+                case DELETE:
+                    deleteOrderById(orderId);
+                    break;
+                default:
+                    throw new UnsupportedOperationException();
+            }
+        }
+    }
+    
     private void updateOrderById(final Object orderId) {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         int randomInt = random.nextInt(-100, 100);
@@ -160,4 +184,8 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
             DataSourceExecuteUtils.execute(dataSource, sql, new 
Object[]{orderId});
         }
     }
+    
+    private enum IncrementalAction {
+        PLAIN_UPDATE, UPDATE_NULL, DELETE
+    }
 }

Reply via email to