This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a657be1f5e1 Recover DataRecordGroupEngine (#29776)
a657be1f5e1 is described below
commit a657be1f5e1fc9995d7637bc73b533e8330d360d
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 19 17:43:47 2024 +0800
Recover DataRecordGroupEngine (#29776)
* Recover DataRecordGroupEngine
* Update unit test and E2E test
---
.../user-manual/error-code/sql-error-code.cn.md | 1 +
.../user-manual/error-code/sql-error-code.en.md | 1 +
...ipelineUnexpectedDataRecordOrderException.java} | 27 ++--
.../importer/sink/type/PipelineDataSourceSink.java | 21 +--
.../pipeline/core/ingest/record/DataRecord.java | 11 +-
.../ingest/record/group/DataRecordGroupEngine.java | 145 ++++++++++++++----
.../ingest/record/group/GroupedDataRecord.java | 8 +-
.../core/ingest/record/DataRecordTest.java | 6 +-
.../record/group/DataRecordGroupEngineTest.java | 168 ++++++++++++++-------
9 files changed, 261 insertions(+), 127 deletions(-)
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index 6f22a1ec204..2d8149b54d9 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -118,6 +118,7 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
| HY000 | 18020 | Failed to get DDL for table \`%s\`.
|
| 42S01 | 18030 | Duplicate storage unit names \`%s\`.
|
| 42S02 | 18031 | Storage units names \`%s\` do not exist.
|
+| HY000 | 18050 | Before data record is \`%s\`, after data record is
\`%s\`. |
| 08000 | 18051 | Data check table \`%s\` failed.
|
| 0A000 | 18052 | Unsupported pipeline database type \`%s\`.
|
| 0A000 | 18053 | Unsupported CRC32 data consistency calculate
algorithm with database type \`%s\`. |
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 445b6842ac2..ee3f90f4c43 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -118,6 +118,7 @@ SQL error codes provide by standard `SQL State`, `Vendor
Code` and `Reason`, whi
| HY000 | 18020 | Failed to get DDL for table \`%s\`.
|
| 42S01 | 18030 | Duplicate storage unit names \`%s\`.
|
| 42S02 | 18031 | Storage units names \`%s\` do not exist.
|
+| HY000 | 18050 | Before data record is \`%s\`, after data record is
\`%s\`. |
| 08000 | 18051 | Data check table \`%s\` failed.
|
| 0A000 | 18052 | Unsupported pipeline database type \`%s\`.
|
| 0A000 | 18053 | Unsupported CRC32 data consistency calculate
algorithm with database type \`%s\`. |
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineUnexpectedDataRecordOrderException.java
similarity index 53%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineUnexpectedDataRecordOrderException.java
index ef54b3c7743..e285cbf1166 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineUnexpectedDataRecordOrderException.java
@@ -15,25 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
+package org.apache.shardingsphere.data.pipeline.core.exception.data;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
+import
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
-import java.util.Collection;
-
-@RequiredArgsConstructor
-@Getter
-public final class GroupedDataRecord {
-
- private final String tableName;
-
- private final Collection<DataRecord> batchInsertDataRecords;
-
- private final Collection<DataRecord> batchUpdateDataRecords;
+/**
+ * Pipeline unexpected data record order exception.
+ */
+public final class PipelineUnexpectedDataRecordOrderException extends
PipelineSQLException {
- private final Collection<DataRecord> batchDeleteDataRecords;
+ private static final long serialVersionUID = 6023695604738387750L;
- private final Collection<DataRecord> nonBatchRecords;
+ public PipelineUnexpectedDataRecordOrderException(final DataRecord
beforeDataRecord, final DataRecord afterDataRecord) {
+ super(XOpenSQLState.GENERAL_ERROR, 50, String.format("Before data
record is `%s`, after data record is `%s`.", beforeDataRecord,
afterDataRecord));
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index f75363e4180..dcbe4dcaf3a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -39,7 +39,6 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -72,15 +71,14 @@ public final class PipelineDataSourceSink implements
PipelineSink {
@Override
public PipelineJobProgressUpdatedParameter write(final String ackId, final
Collection<Record> records) {
- Collection<DataRecord> dataRecords =
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
+ List<DataRecord> dataRecords =
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
if (dataRecords.isEmpty()) {
return new PipelineJobProgressUpdatedParameter(0);
}
for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
- batchWrite(each.getBatchDeleteDataRecords());
- batchWrite(each.getBatchInsertDataRecords());
- batchWrite(each.getBatchUpdateDataRecords());
- sequentialWrite(each.getNonBatchRecords());
+ batchWrite(each.getDeleteDataRecords());
+ batchWrite(each.getInsertDataRecords());
+ batchWrite(each.getUpdateDataRecords());
}
return new PipelineJobProgressUpdatedParameter((int)
dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT ==
each.getType()).count());
}
@@ -105,17 +103,6 @@ public final class PipelineDataSourceSink implements
PipelineSink {
}
}
- private void sequentialWrite(final Collection<DataRecord> records) {
- // TODO it's better use transaction, but execute delete maybe not
effect when open transaction of PostgreSQL sometimes
- try {
- for (DataRecord each : records) {
- doWrite(Collections.singleton(each));
- }
- } catch (final SQLException ex) {
- throw new PipelineImporterJobWriteException(ex);
- }
- }
-
private void doWrite(final Collection<DataRecord> records) throws
SQLException {
try (Connection connection = dataSource.getConnection()) {
boolean enableTransaction = records.size() > 1;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
index bf2abf59515..9e047d3e3f9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
@@ -105,7 +105,16 @@ public final class DataRecord extends Record {
* @return key
*/
public Key getKey() {
- return PipelineSQLOperationType.DELETE == type ? new Key(tableName,
oldUniqueKeyValues) : new Key(tableName, uniqueKeyValue);
+ return new Key(tableName, uniqueKeyValue);
+ }
+
+ /**
+ * Get old key.
+ *
+ * @return key
+ */
+ public Key getOldKey() {
+ return new Key(tableName, oldUniqueKeyValues);
}
@RequiredArgsConstructor
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index 0e9a27eac5e..75317b91bc6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -18,18 +18,18 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
+import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord.Key;
+import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.Collections;
-import java.util.EnumMap;
import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
@@ -38,41 +38,124 @@ import java.util.stream.Collectors;
public final class DataRecordGroupEngine {
/**
- * Group by table and operation type.
+ * Merge data record.
+ * <pre>
+ * insert + insert -> exception
+ * update + insert -> exception
+ * delete + insert -> insert
+ * insert + update -> insert
+ * update + update -> update
+ * delete + update -> exception
+ * insert + delete -> delete
+ * update + delete -> delete
+ * delete + delete -> exception
+ * </pre>
*
- * @param records data records
+ * @param dataRecords data records
+ * @return merged data records
+ */
+ public List<DataRecord> merge(final List<DataRecord> dataRecords) {
+ Map<DataRecord.Key, DataRecord> result = new HashMap<>();
+ dataRecords.forEach(each -> {
+ if (PipelineSQLOperationType.INSERT.equals(each.getType())) {
+ mergeInsert(each, result);
+ } else if (PipelineSQLOperationType.UPDATE.equals(each.getType()))
{
+ mergeUpdate(each, result);
+ } else if (PipelineSQLOperationType.DELETE.equals(each.getType()))
{
+ mergeDelete(each, result);
+ }
+ });
+ return new ArrayList<>(result.values());
+ }
+
+ /**
+ * Group by table and type.
+ *
+ * @param dataRecords data records
* @return grouped data records
*/
- public Collection<GroupedDataRecord> group(final Collection<DataRecord>
records) {
- Map<Key, Boolean> duplicateKeys = getDuplicateKeys(records);
- Collection<String> tableNames = new LinkedHashSet<>();
- Map<String, List<DataRecord>> nonBatchRecords = new LinkedHashMap<>();
- Map<String, Map<PipelineSQLOperationType, Collection<DataRecord>>>
batchDataRecords = new LinkedHashMap<>();
- for (DataRecord each : records) {
- tableNames.add(each.getTableName());
- if (duplicateKeys.getOrDefault(each.getKey(), false)) {
- nonBatchRecords.computeIfAbsent(each.getTableName(), ignored
-> new LinkedList<>()).add(each);
- } else {
- batchDataRecords.computeIfAbsent(
- each.getTableName(), ignored -> new
EnumMap<>(PipelineSQLOperationType.class)).computeIfAbsent(each.getType(),
ignored -> new LinkedList<>()).add(each);
+ public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
+ List<GroupedDataRecord> result = new ArrayList<>(100);
+ List<DataRecord> mergedDataRecords =
dataRecords.get(0).getUniqueKeyValue().isEmpty() ? dataRecords :
merge(dataRecords);
+ Map<String, List<DataRecord>> tableGroup =
mergedDataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
+ for (Entry<String, List<DataRecord>> entry : tableGroup.entrySet()) {
+ Map<PipelineSQLOperationType, List<DataRecord>> typeGroup =
entry.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
+ result.add(new GroupedDataRecord(entry.getKey(),
typeGroup.getOrDefault(PipelineSQLOperationType.INSERT,
Collections.emptyList()),
+ typeGroup.getOrDefault(PipelineSQLOperationType.UPDATE,
Collections.emptyList()),
typeGroup.getOrDefault(PipelineSQLOperationType.DELETE,
Collections.emptyList())));
+ }
+ return result;
+ }
+
+ private void mergeInsert(final DataRecord dataRecord, final
Map<DataRecord.Key, DataRecord> dataRecords) {
+ DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
+ ShardingSpherePreconditions.checkState(null == beforeDataRecord
+ ||
PipelineSQLOperationType.DELETE.equals(beforeDataRecord.getType()), () -> new
PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
+ dataRecords.put(dataRecord.getKey(), dataRecord);
+ }
+
+ private void mergeUpdate(final DataRecord dataRecord, final
Map<DataRecord.Key, DataRecord> dataRecords) {
+ DataRecord beforeDataRecord = checkUpdatedUniqueKey(dataRecord) ?
dataRecords.get(dataRecord.getOldKey()) : dataRecords.get(dataRecord.getKey());
+ if (null == beforeDataRecord) {
+ dataRecords.put(dataRecord.getKey(), dataRecord);
+ return;
+ }
+
ShardingSpherePreconditions.checkState(!PipelineSQLOperationType.DELETE.equals(beforeDataRecord.getType()),
() -> new UnsupportedSQLOperationException("Not Delete"));
+ if (checkUpdatedUniqueKey(dataRecord)) {
+ dataRecords.remove(dataRecord.getOldKey());
+ }
+ if
(PipelineSQLOperationType.INSERT.equals(beforeDataRecord.getType())) {
+ DataRecord mergedDataRecord =
mergeColumn(PipelineSQLOperationType.INSERT, dataRecord.getTableName(),
beforeDataRecord, dataRecord);
+ dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+ return;
+ }
+ if
(PipelineSQLOperationType.UPDATE.equals(beforeDataRecord.getType())) {
+ DataRecord mergedDataRecord =
mergeColumn(PipelineSQLOperationType.UPDATE, dataRecord.getTableName(),
beforeDataRecord, dataRecord);
+ dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+ }
+ }
+
+ private void mergeDelete(final DataRecord dataRecord, final
Map<DataRecord.Key, DataRecord> dataRecords) {
+ DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
+ ShardingSpherePreconditions.checkState(null == beforeDataRecord
+ ||
!PipelineSQLOperationType.DELETE.equals(beforeDataRecord.getType()), () -> new
PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
+ if (null != beforeDataRecord &&
PipelineSQLOperationType.UPDATE.equals(beforeDataRecord.getType()) &&
checkUpdatedUniqueKey(beforeDataRecord)) {
+ DataRecord mergedDataRecord = new
DataRecord(PipelineSQLOperationType.DELETE, dataRecord.getTableName(),
dataRecord.getPosition(), dataRecord.getColumnCount());
+ for (int i = 0; i < dataRecord.getColumnCount(); i++) {
+ mergedDataRecord.addColumn(new
Column(dataRecord.getColumn(i).getName(),
+ dataRecord.getColumn(i).isUniqueKey() ?
beforeDataRecord.getColumn(i).getOldValue() :
beforeDataRecord.getColumn(i).getValue(), true,
dataRecord.getColumn(i).isUniqueKey()));
+ }
+ dataRecords.remove(beforeDataRecord.getKey());
+ dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+ } else {
+ dataRecords.put(dataRecord.getOldKey(), dataRecord);
+ }
+ }
+
+ private boolean checkUpdatedUniqueKey(final DataRecord dataRecord) {
+ for (Column each : dataRecord.getColumns()) {
+ if (each.isUniqueKey() && each.isUpdated()) {
+ return true;
}
}
- return tableNames.stream().map(each -> getGroupedDataRecord(
- each, batchDataRecords.getOrDefault(each,
Collections.emptyMap()), nonBatchRecords.getOrDefault(each,
Collections.emptyList()))).collect(Collectors.toList());
+ return false;
}
- private Map<Key, Boolean> getDuplicateKeys(final Collection<DataRecord>
records) {
- Map<Key, Boolean> result = new HashMap<>();
- for (DataRecord each : records) {
- Key key = each.getKey();
- result.put(key, result.containsKey(key));
+ private DataRecord mergeColumn(final PipelineSQLOperationType type, final
String tableName, final DataRecord preDataRecord, final DataRecord
curDataRecord) {
+ DataRecord result = new DataRecord(type, tableName,
curDataRecord.getPosition(), curDataRecord.getColumnCount());
+ for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
+ result.addColumn(new Column(
+ curDataRecord.getColumn(i).getName(),
+ preDataRecord.getColumn(i).isUniqueKey()
+ ?
mergePrimaryKeyOldValue(preDataRecord.getColumn(i), curDataRecord.getColumn(i))
+ : null,
+ curDataRecord.getColumn(i).getValue(),
+ preDataRecord.getColumn(i).isUpdated() ||
curDataRecord.getColumn(i).isUpdated(),
+ curDataRecord.getColumn(i).isUniqueKey()));
}
return result;
}
- private GroupedDataRecord getGroupedDataRecord(final String tableName,
final Map<PipelineSQLOperationType, Collection<DataRecord>> batchRecords, final
Collection<DataRecord> nonBatchRecords) {
- return new GroupedDataRecord(tableName,
batchRecords.getOrDefault(PipelineSQLOperationType.INSERT,
Collections.emptyList()),
- batchRecords.getOrDefault(PipelineSQLOperationType.UPDATE,
Collections.emptyList()),
batchRecords.getOrDefault(PipelineSQLOperationType.DELETE,
Collections.emptyList()),
- nonBatchRecords);
+ private Object mergePrimaryKeyOldValue(final Column beforeColumn, final
Column column) {
+ return beforeColumn.isUpdated() ? beforeColumn.getOldValue() :
(column.isUpdated() ? column.getOldValue() : null);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
index ef54b3c7743..45d58c5381f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
@@ -29,11 +29,9 @@ public final class GroupedDataRecord {
private final String tableName;
- private final Collection<DataRecord> batchInsertDataRecords;
+ private final Collection<DataRecord> insertDataRecords;
- private final Collection<DataRecord> batchUpdateDataRecords;
+ private final Collection<DataRecord> updateDataRecords;
- private final Collection<DataRecord> batchDeleteDataRecords;
-
- private final Collection<DataRecord> nonBatchRecords;
+ private final Collection<DataRecord> deleteDataRecords;
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
index f10f4b10d13..c991138416c 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
@@ -27,7 +27,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
class DataRecordTest {
@Test
- void assertGetKeyWithUpdate() {
+ void assertGetKey() {
DataRecord beforeDataRecord = new
DataRecord(PipelineSQLOperationType.UPDATE, "foo_tbl", new
IngestPlaceholderPosition(), 1);
beforeDataRecord.addColumn(new Column("id", 1, true, true));
DataRecord afterDataRecord = new
DataRecord(PipelineSQLOperationType.UPDATE, "foo_tbl", new
IngestPlaceholderPosition(), 1);
@@ -36,11 +36,11 @@ class DataRecordTest {
}
@Test
- void assertGetKeyWithDelete() {
+ void assertGetOldKey() {
DataRecord beforeDataRecord = new
DataRecord(PipelineSQLOperationType.DELETE, "foo_tbl", new
IngestPlaceholderPosition(), 1);
beforeDataRecord.addColumn(new Column("id", 1, 2, true, true));
DataRecord afterDataRecord = new
DataRecord(PipelineSQLOperationType.DELETE, "foo_tbl", new
IngestPlaceholderPosition(), 1);
afterDataRecord.addColumn(new Column("id", 1, 3, true, true));
- assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getKey()));
+ assertThat(beforeDataRecord.getOldKey(),
is(afterDataRecord.getOldKey()));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
index 64a6b84769b..5a3451f4ad8 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
@@ -18,149 +18,209 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
+import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
class DataRecordGroupEngineTest {
+ private final DataRecordGroupEngine groupEngine = new
DataRecordGroupEngine();
+
+ @Test
+ void assertInsertBeforeInsert() {
+ DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+ DataRecord afterDataRecord = mockInsertDataRecord(1, 1, 1);
+ assertThrows(PipelineUnexpectedDataRecordOrderException.class, () ->
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
+ }
+
+ @Test
+ void assertUpdateBeforeInsert() {
+ DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 2);
+ DataRecord afterDataRecord = mockInsertDataRecord(1, 1, 1);
+ assertThrows(PipelineUnexpectedDataRecordOrderException.class, () ->
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
+ }
+
@Test
void assertDeleteBeforeInsert() {
DataRecord beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
DataRecord afterDataRecord = mockInsertDataRecord(1, 1, 1);
- Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
-
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
}
@Test
void assertInsertBeforeUpdate() {
DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
- Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
-
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(PipelineSQLOperationType.INSERT));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertNull(dataRecord.getColumn(0).getOldValue());
+ assertThat(dataRecord.getColumn(0).getValue(), is(1));
+ assertThat(dataRecord.getColumn(1).getValue(), is(2));
+ assertThat(dataRecord.getColumn(2).getValue(), is(2));
}
@Test
void assertInsertBeforeUpdatePrimaryKey() {
DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
- DataRecord afterDataRecord = mockUpdateDataRecord(1, 1, 2, 2);
- Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
+ Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
-
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(PipelineSQLOperationType.INSERT));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertNull(dataRecord.getColumn(0).getOldValue());
+ assertThat(dataRecord.getColumn(0).getValue(), is(2));
+ assertThat(dataRecord.getColumn(1).getValue(), is(2));
+ assertThat(dataRecord.getColumn(2).getValue(), is(2));
}
@Test
void assertUpdateBeforeUpdate() {
DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
- Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
-
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertNull(dataRecord.getColumn(0).getOldValue());
+ assertThat(dataRecord.getColumn(0).getValue(), is(1));
+ assertThat(dataRecord.getColumn(1).getValue(), is(2));
+ assertThat(dataRecord.getColumn(2).getValue(), is(2));
}
@Test
void assertUpdateBeforeUpdatePrimaryKey() {
DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
- Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.size(), is(1));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
+ assertThat(dataRecord.getColumn(0).getValue(), is(2));
+ assertThat(dataRecord.getColumn(1).getValue(), is(2));
+ assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ }
+
+ @Test
+ void assertUpdatePrimaryKeyBeforeUpdate() {
+ DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+ DataRecord afterDataRecord = mockUpdateDataRecord(2, 2, 2);
+ Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
-
assertDataRecordsMatched(actual.iterator().next().getBatchUpdateDataRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
+ assertThat(dataRecord.getColumn(0).getValue(), is(2));
+ assertThat(dataRecord.getColumn(1).getValue(), is(2));
+ assertThat(dataRecord.getColumn(2).getValue(), is(2));
}
@Test
void assertUpdatePrimaryKeyBeforeUpdatePrimaryKey() {
DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
DataRecord afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
- Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
- GroupedDataRecord actualGroupedDataRecord = actual.iterator().next();
- assertThat(actualGroupedDataRecord.getBatchUpdateDataRecords().size(),
is(2));
- Iterator<DataRecord> batchUpdateDataRecords =
actualGroupedDataRecord.getBatchUpdateDataRecords().iterator();
- DataRecord actualDataRecord1 = batchUpdateDataRecords.next();
- assertThat(actualDataRecord1.getType(),
is(PipelineSQLOperationType.UPDATE));
- assertThat(actualDataRecord1.getTableName(), is("order"));
- assertThat(actualDataRecord1.getColumn(0).getOldValue(), is(1));
- assertThat(actualDataRecord1.getColumn(0).getValue(), is(2));
- assertThat(actualDataRecord1.getColumn(1).getValue(), is(1));
- assertThat(actualDataRecord1.getColumn(2).getValue(), is(1));
- DataRecord actualDataRecord2 = batchUpdateDataRecords.next();
- assertThat(actualDataRecord2.getType(),
is(PipelineSQLOperationType.UPDATE));
- assertThat(actualDataRecord2.getTableName(), is("order"));
- assertThat(actualDataRecord2.getColumn(0).getOldValue(), is(2));
- assertThat(actualDataRecord2.getColumn(0).getValue(), is(3));
- assertThat(actualDataRecord2.getColumn(1).getValue(), is(2));
- assertThat(actualDataRecord2.getColumn(2).getValue(), is(2));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(PipelineSQLOperationType.UPDATE));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
+ assertThat(dataRecord.getColumn(0).getValue(), is(3));
+ assertThat(dataRecord.getColumn(1).getValue(), is(2));
+ assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ }
+
+ @Test
+ void assertDeleteBeforeUpdate() {
+ DataRecord beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
+ DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+ assertThrows(UnsupportedSQLOperationException.class, () ->
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
}
@Test
void assertInsertBeforeDelete() {
DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
- Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
-
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
}
@Test
void assertUpdateBeforeDelete() {
DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
- Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
-
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
}
@Test
void assertUpdatePrimaryKeyBeforeDelete() {
DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
DataRecord afterDataRecord = mockDeleteDataRecord(2, 1, 1);
- Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<DataRecord> actual =
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
-
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(PipelineSQLOperationType.DELETE));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertNull(dataRecord.getColumn(0).getOldValue());
+ assertThat(dataRecord.getColumn(0).getValue(), is(1));
+ assertThat(dataRecord.getColumn(1).getValue(), is(1));
+ assertThat(dataRecord.getColumn(2).getValue(), is(1));
}
- private void assertDataRecordsMatched(final Collection<DataRecord>
actualRecords, final List<DataRecord> expectedRecords) {
- for (int i = 0; i < actualRecords.size(); i++) {
- assertThat(actualRecords.iterator().next(),
sameInstance(expectedRecords.get(0)));
- }
+ @Test
+ void assertDeleteBeforeDelete() {
+ DataRecord beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
+ DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+ assertThrows(PipelineUnexpectedDataRecordOrderException.class, () ->
groupEngine.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
}
@Test
void assertGroup() {
- List<DataRecord> dataRecords = Arrays.asList(
+ List<DataRecord> dataRecords = mockDataRecords();
+ List<GroupedDataRecord> groupedDataRecords =
groupEngine.group(dataRecords);
+ assertThat(groupedDataRecords.size(), is(2));
+ assertThat(groupedDataRecords.get(0).getTableName(), is("t1"));
+ assertThat(groupedDataRecords.get(1).getTableName(), is("t2"));
+ assertThat(groupedDataRecords.get(0).getInsertDataRecords().size(),
is(1));
+ assertThat(groupedDataRecords.get(0).getUpdateDataRecords().size(),
is(1));
+ assertThat(groupedDataRecords.get(0).getDeleteDataRecords().size(),
is(1));
+ }
+
+ private List<DataRecord> mockDataRecords() {
+ return Arrays.asList(
mockInsertDataRecord("t1", 1, 1, 1),
mockUpdateDataRecord("t1", 1, 2, 1),
mockUpdateDataRecord("t1", 1, 2, 2),
mockUpdateDataRecord("t1", 2, 1, 1),
mockUpdateDataRecord("t1", 2, 2, 1),
mockUpdateDataRecord("t1", 2, 2, 2),
- mockInsertDataRecord("t1", 10, 10, 10),
mockDeleteDataRecord("t1", 3, 1, 1),
mockInsertDataRecord("t2", 1, 1, 1));
- Collection<GroupedDataRecord> groupedDataRecords = new
DataRecordGroupEngine().group(dataRecords);
- assertThat(groupedDataRecords.size(), is(2));
- Iterator<GroupedDataRecord> groupedDataRecordsIterator =
groupedDataRecords.iterator();
- GroupedDataRecord actualGroupedDataRecord1 =
groupedDataRecordsIterator.next();
- assertThat(actualGroupedDataRecord1.getTableName(), is("t1"));
-
assertThat(actualGroupedDataRecord1.getBatchInsertDataRecords().size(), is(1));
-
assertThat(actualGroupedDataRecord1.getBatchUpdateDataRecords().size(), is(0));
-
assertThat(actualGroupedDataRecord1.getBatchDeleteDataRecords().size(), is(1));
- assertThat(actualGroupedDataRecord1.getNonBatchRecords().size(),
is(6));
- GroupedDataRecord actualGroupedDataRecord2 =
groupedDataRecordsIterator.next();
- assertThat(actualGroupedDataRecord2.getTableName(), is("t2"));
-
assertThat(actualGroupedDataRecord2.getBatchInsertDataRecords().size(), is(1));
}
private DataRecord mockInsertDataRecord(final int id, final int userId,
final int totalPrice) {