This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 72c5a86b432 Fix DataRecordGroupEngine mergeUpdate and mergeDelete
(#29853)
72c5a86b432 is described below
commit 72c5a86b432c49506aefc69adcedcd076a51486e
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Jan 25 17:34:08 2024 +0800
Fix DataRecordGroupEngine mergeUpdate and mergeDelete (#29853)
* Fix DataRecordGroupEngine mergeUpdate and mergeDelete
* Improve DataRecordGroupEngineTest
* Improve E2EIncrementalTask to generate more types of changes
---
.../ingest/record/group/DataRecordGroupEngine.java | 29 ++----
.../record/group/DataRecordGroupEngineTest.java | 113 +++++++++++----------
.../pipeline/cases/task/E2EIncrementalTask.java | 44 ++++++--
3 files changed, 104 insertions(+), 82 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index f58558f26a8..82624900af8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -94,22 +94,22 @@ public final class DataRecordGroupEngine {
}
private void mergeUpdate(final DataRecord dataRecord, final
Map<DataRecord.Key, DataRecord> dataRecords) {
- DataRecord beforeDataRecord = checkUpdatedUniqueKey(dataRecord) ?
dataRecords.get(dataRecord.getOldKey()) : dataRecords.get(dataRecord.getKey());
+ DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
if (null == beforeDataRecord) {
dataRecords.put(dataRecord.getKey(), dataRecord);
return;
}
ShardingSpherePreconditions.checkState(PipelineSQLOperationType.DELETE
!= beforeDataRecord.getType(), () -> new UnsupportedSQLOperationException("Not
Delete"));
- if (checkUpdatedUniqueKey(dataRecord)) {
+ if (isUniqueKeyUpdated(dataRecord)) {
dataRecords.remove(dataRecord.getOldKey());
}
if (PipelineSQLOperationType.INSERT == beforeDataRecord.getType()) {
- DataRecord mergedDataRecord =
mergeColumn(PipelineSQLOperationType.INSERT, dataRecord.getTableName(),
beforeDataRecord, dataRecord);
+ DataRecord mergedDataRecord =
mergeUpdateColumn(PipelineSQLOperationType.INSERT, dataRecord.getTableName(),
beforeDataRecord, dataRecord);
dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
return;
}
if (PipelineSQLOperationType.UPDATE == beforeDataRecord.getType()) {
- DataRecord mergedDataRecord =
mergeColumn(PipelineSQLOperationType.UPDATE, dataRecord.getTableName(),
beforeDataRecord, dataRecord);
+ DataRecord mergedDataRecord =
mergeUpdateColumn(PipelineSQLOperationType.UPDATE, dataRecord.getTableName(),
beforeDataRecord, dataRecord);
dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
}
}
@@ -118,11 +118,12 @@ public final class DataRecordGroupEngine {
DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
ShardingSpherePreconditions.checkState(null == beforeDataRecord ||
PipelineSQLOperationType.DELETE != beforeDataRecord.getType(),
() -> new
PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
- if (null != beforeDataRecord && PipelineSQLOperationType.UPDATE ==
beforeDataRecord.getType() && checkUpdatedUniqueKey(beforeDataRecord)) {
+ if (null != beforeDataRecord && PipelineSQLOperationType.UPDATE ==
beforeDataRecord.getType() && isUniqueKeyUpdated(beforeDataRecord)) {
DataRecord mergedDataRecord = new
DataRecord(PipelineSQLOperationType.DELETE, dataRecord.getTableName(),
dataRecord.getPosition(), dataRecord.getColumnCount());
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
mergedDataRecord.addColumn(new
Column(dataRecord.getColumn(i).getName(),
- dataRecord.getColumn(i).isUniqueKey() ?
beforeDataRecord.getColumn(i).getOldValue() :
beforeDataRecord.getColumn(i).getValue(), true,
dataRecord.getColumn(i).isUniqueKey()));
+ dataRecord.getColumn(i).isUniqueKey() ?
beforeDataRecord.getColumn(i).getOldValue() :
beforeDataRecord.getColumn(i).getValue(),
+ null, true, dataRecord.getColumn(i).isUniqueKey()));
}
dataRecords.remove(beforeDataRecord.getKey());
dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
@@ -131,7 +132,8 @@ public final class DataRecordGroupEngine {
}
}
- private boolean checkUpdatedUniqueKey(final DataRecord dataRecord) {
+ private boolean isUniqueKeyUpdated(final DataRecord dataRecord) {
+ // TODO Compatible with multiple unique indexes
for (Column each : dataRecord.getColumns()) {
if (each.isUniqueKey() && each.isUpdated()) {
return true;
@@ -140,25 +142,16 @@ public final class DataRecordGroupEngine {
return false;
}
- private DataRecord mergeColumn(final PipelineSQLOperationType type, final
String tableName, final DataRecord preDataRecord, final DataRecord
curDataRecord) {
+ private DataRecord mergeUpdateColumn(final PipelineSQLOperationType type,
final String tableName, final DataRecord preDataRecord, final DataRecord
curDataRecord) {
DataRecord result = new DataRecord(type, tableName,
curDataRecord.getPosition(), curDataRecord.getColumnCount());
for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
result.addColumn(new Column(
curDataRecord.getColumn(i).getName(),
- preDataRecord.getColumn(i).isUniqueKey()
- ?
mergeUniqueKeyOldValue(preDataRecord.getColumn(i), curDataRecord.getColumn(i))
- : null,
+ preDataRecord.getColumn(i).getOldValue(),
curDataRecord.getColumn(i).getValue(),
preDataRecord.getColumn(i).isUpdated() ||
curDataRecord.getColumn(i).isUpdated(),
curDataRecord.getColumn(i).isUniqueKey()));
}
return result;
}
-
- private Object mergeUniqueKeyOldValue(final Column beforeColumn, final
Column column) {
- if (beforeColumn.isUpdated()) {
- return beforeColumn.getOldValue();
- }
- return column.isUpdated() ? column.getOldValue() : null;
- }
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
index 5a3451f4ad8..504143998c0 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
@@ -28,11 +28,11 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
class DataRecordGroupEngineTest {
@@ -55,8 +55,8 @@ class DataRecordGroupEngineTest {
@Test
void assertDeleteBeforeInsert() {
- DataRecord beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
- DataRecord afterDataRecord = mockInsertDataRecord(1, 1, 1);
+ DataRecord beforeDataRecord = mockDeleteDataRecord(1, 10, 50);
+ DataRecord afterDataRecord = mockInsertDataRecord(1, 10, 100);
Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
@@ -64,105 +64,107 @@ class DataRecordGroupEngineTest {
@Test
void assertInsertBeforeUpdate() {
- DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
- DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+ DataRecord beforeDataRecord = mockInsertDataRecord(1, 10, 50);
+ DataRecord afterDataRecord = mockUpdateDataRecord(1, 10, 200);
Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(PipelineSQLOperationType.INSERT));
assertThat(dataRecord.getTableName(), is("order"));
- assertNull(dataRecord.getColumn(0).getOldValue());
- assertThat(dataRecord.getColumn(0).getValue(), is(1));
- assertThat(dataRecord.getColumn(1).getValue(), is(2));
- assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ assertColumnsMatched(dataRecord.getColumn(0), new Column("id", null,
1, true, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
null, 10, true, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", null, 200, true, false));
+ }
+
+ private void assertColumnsMatched(final Column actual, final Column
expected) {
+ assertThat(actual.getName(), is(expected.getName()));
+ assertThat(actual.getOldValue(), is(expected.getOldValue()));
+ assertThat(actual.getValue(), is(expected.getValue()));
+ assertThat(actual.isUpdated(), is(expected.isUpdated()));
+ assertThat(actual.isUniqueKey(), is(expected.isUniqueKey()));
}
@Test
void assertInsertBeforeUpdatePrimaryKey() {
- DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
- DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
+ DataRecord beforeDataRecord = mockInsertDataRecord(1, 10, 50);
+ DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 10, 50);
Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(PipelineSQLOperationType.INSERT));
assertThat(dataRecord.getTableName(), is("order"));
- assertNull(dataRecord.getColumn(0).getOldValue());
- assertThat(dataRecord.getColumn(0).getValue(), is(2));
- assertThat(dataRecord.getColumn(1).getValue(), is(2));
- assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ assertColumnsMatched(dataRecord.getColumn(0), new Column("id", null,
2, true, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
null, 10, true, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", null, 50, true, false));
}
@Test
void assertUpdateBeforeUpdate() {
- DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
- DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+ DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 10, 100);
+ DataRecord afterDataRecord = mockUpdateDataRecord(1, 1, 10, 200);
Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
assertThat(dataRecord.getTableName(), is("order"));
- assertNull(dataRecord.getColumn(0).getOldValue());
- assertThat(dataRecord.getColumn(0).getValue(), is(1));
- assertThat(dataRecord.getColumn(1).getValue(), is(2));
- assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 1,
false, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
10, 10, false, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", 50, 200, true, false));
}
@Test
void assertUpdateBeforeUpdatePrimaryKey() {
- DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
- DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
+ DataRecord beforeDataRecord = mockUpdateDataRecord(1, 10, 50);
+ DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 10, 200);
Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
assertThat(dataRecord.getTableName(), is("order"));
- assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
- assertThat(dataRecord.getColumn(0).getValue(), is(2));
- assertThat(dataRecord.getColumn(1).getValue(), is(2));
- assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 2,
true, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
10, 10, false, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", 50, 200, true, false));
}
@Test
void assertUpdatePrimaryKeyBeforeUpdate() {
- DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
- DataRecord afterDataRecord = mockUpdateDataRecord(2, 2, 2);
+ DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 10, 50);
+ DataRecord afterDataRecord = mockUpdateDataRecord(2, 10, 200);
Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
assertThat(dataRecord.getTableName(), is("order"));
- assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
- assertThat(dataRecord.getColumn(0).getValue(), is(2));
- assertThat(dataRecord.getColumn(1).getValue(), is(2));
- assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 2,
true, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
10, 10, false, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", 50, 200, true, false));
}
@Test
void assertUpdatePrimaryKeyBeforeUpdatePrimaryKey() {
- DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
- DataRecord afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
+ DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 10, 50);
+ DataRecord afterDataRecord = mockUpdateDataRecord(2, 3, 10, 50);
Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
assertThat(dataRecord.getTableName(), is("order"));
- assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
- assertThat(dataRecord.getColumn(0).getValue(), is(3));
- assertThat(dataRecord.getColumn(1).getValue(), is(2));
- assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1, 3,
true, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
10, 10, false, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", 50, 50, false, false));
}
@Test
void assertDeleteBeforeUpdate() {
- DataRecord beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
- DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+ DataRecord beforeDataRecord = mockDeleteDataRecord(1, 10, 50);
+ DataRecord afterDataRecord = mockUpdateDataRecord(1, 20, 200);
assertThrows(UnsupportedSQLOperationException.class, () ->
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
}
@Test
void assertInsertBeforeDelete() {
- DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
- DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+ DataRecord beforeDataRecord = mockInsertDataRecord(1, 10, 50);
+ DataRecord afterDataRecord = mockDeleteDataRecord(1, 10, 50);
Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
@@ -170,8 +172,8 @@ class DataRecordGroupEngineTest {
@Test
void assertUpdateBeforeDelete() {
- DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
- DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+ DataRecord beforeDataRecord = mockUpdateDataRecord(1, 10, 50);
+ DataRecord afterDataRecord = mockDeleteDataRecord(1, 10, 50);
Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
@@ -179,17 +181,16 @@ class DataRecordGroupEngineTest {
@Test
void assertUpdatePrimaryKeyBeforeDelete() {
- DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
- DataRecord afterDataRecord = mockDeleteDataRecord(2, 1, 1);
+ DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 10, 50);
+ DataRecord afterDataRecord = mockDeleteDataRecord(2, 10, 50);
Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
DataRecord dataRecord = actual.iterator().next();
assertThat(dataRecord.getType(), is(PipelineSQLOperationType.DELETE));
assertThat(dataRecord.getTableName(), is("order"));
- assertNull(dataRecord.getColumn(0).getOldValue());
- assertThat(dataRecord.getColumn(0).getValue(), is(1));
- assertThat(dataRecord.getColumn(1).getValue(), is(1));
- assertThat(dataRecord.getColumn(2).getValue(), is(1));
+ assertColumnsMatched(dataRecord.getColumn(0), new Column("id", 1,
null, true, true));
+ assertColumnsMatched(dataRecord.getColumn(1), new Column("user_id",
10, null, true, false));
+ assertColumnsMatched(dataRecord.getColumn(2), new
Column("total_price", 50, null, true, false));
}
@Test
@@ -236,7 +237,7 @@ class DataRecordGroupEngineTest {
}
private DataRecord mockUpdateDataRecord(final int id, final int userId,
final int totalPrice) {
- return mockUpdateDataRecord("order", null, id, userId, totalPrice);
+ return mockUpdateDataRecord("order", id, id, userId, totalPrice);
}
private DataRecord mockUpdateDataRecord(final Integer oldId, final int id,
final int userId, final int totalPrice) {
@@ -244,14 +245,14 @@ class DataRecordGroupEngineTest {
}
private DataRecord mockUpdateDataRecord(final String tableName, final int
id, final int userId, final int totalPrice) {
- return mockUpdateDataRecord(tableName, null, id, userId, totalPrice);
+ return mockUpdateDataRecord(tableName, id, id, userId, totalPrice);
}
private DataRecord mockUpdateDataRecord(final String tableName, final
Integer oldId, final int id, final int userId, final int totalPrice) {
DataRecord result = new DataRecord(PipelineSQLOperationType.UPDATE,
tableName, new IngestPlaceholderPosition(), 3);
- result.addColumn(new Column("id", oldId, id, null != oldId, true));
- result.addColumn(new Column("user_id", userId, true, false));
- result.addColumn(new Column("total_price", totalPrice, true, false));
+ result.addColumn(new Column("id", oldId, id,
!Objects.deepEquals(oldId, id), true));
+ result.addColumn(new Column("user_id", userId, userId, false, false));
+ result.addColumn(new Column("total_price", 50, totalPrice, 50 !=
totalPrice, false));
return result;
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index 07108123858..482bf4f3095 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -40,6 +40,8 @@ import java.time.OffsetDateTime;
import java.time.Year;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
@@ -79,16 +81,20 @@ public final class E2EIncrementalTask extends
BaseIncrementTask {
primaryKeys.add(each[0]);
insertOrder(each);
}
- ThreadLocalRandom random = ThreadLocalRandom.current();
- for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
- // TODO 0000-00-00 00:00:00 now will cause consistency check
failed of MySQL.
- // DataSourceUtil.execute(dataSource, String.format("UPDATE %s SET
t_datetime='0000-00-00 00:00:00' WHERE order_id = ?", orderTableName)
- updateOrderById(primaryKeys.get(random.nextInt(0,
primaryKeys.size())));
+ Iterator<Object> primaryKeysIterator = primaryKeys.iterator();
+ Iterator<List<IncrementalAction>> incrementalActionsIterator =
Arrays.asList(
+ Arrays.asList(IncrementalAction.PLAIN_UPDATE,
IncrementalAction.UPDATE_NULL), Arrays.asList(IncrementalAction.UPDATE_NULL,
IncrementalAction.PLAIN_UPDATE),
+ Arrays.asList(IncrementalAction.PLAIN_UPDATE,
IncrementalAction.DELETE), Arrays.asList(IncrementalAction.UPDATE_NULL,
IncrementalAction.DELETE),
+ Collections.singletonList(IncrementalAction.PLAIN_UPDATE),
Collections.singletonList(IncrementalAction.UPDATE_NULL),
Collections.singletonList(IncrementalAction.DELETE)).iterator();
+ while (primaryKeysIterator.hasNext() &&
incrementalActionsIterator.hasNext()) {
+ doIncrementalChanges(primaryKeysIterator.next(),
incrementalActionsIterator.next());
}
- for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
- setNullToAllFields(primaryKeys.get(random.nextInt(0,
primaryKeys.size())));
- deleteOrderById(primaryKeys.remove(random.nextInt(0,
primaryKeys.size())));
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ while (primaryKeysIterator.hasNext()) {
+ doIncrementalChanges(primaryKeysIterator.next(),
Collections.singletonList(IncrementalAction.values()[random.nextInt(0,
IncrementalAction.values().length)]));
}
+ // TODO 0000-00-00 00:00:00 now will cause consistency check failed of
MySQL.
+ // DataSourceUtil.execute(dataSource, String.format("UPDATE %s SET
t_datetime='0000-00-00 00:00:00' WHERE order_id = ?", orderTableName)
log.info("increment task runnable execute successfully.");
}
@@ -106,6 +112,24 @@ public final class E2EIncrementalTask extends
BaseIncrementTask {
DataSourceExecuteUtils.execute(dataSource, sql, orderInsertData);
}
+ private void doIncrementalChanges(final Object orderId, final
List<IncrementalAction> actions) {
+ for (IncrementalAction each : actions) {
+ switch (each) {
+ case PLAIN_UPDATE:
+ updateOrderById(orderId);
+ break;
+ case UPDATE_NULL:
+ setNullToAllFields(orderId);
+ break;
+ case DELETE:
+ deleteOrderById(orderId);
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
+
private void updateOrderById(final Object orderId) {
ThreadLocalRandom random = ThreadLocalRandom.current();
int randomInt = random.nextInt(-100, 100);
@@ -160,4 +184,8 @@ public final class E2EIncrementalTask extends
BaseIncrementTask {
DataSourceExecuteUtils.execute(dataSource, sql, new
Object[]{orderId});
}
}
+
+ private enum IncrementalAction {
+ PLAIN_UPDATE, UPDATE_NULL, DELETE
+ }
}