This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 79cabdc52f9 Refactor DataRecord.getKey() (#29515)
79cabdc52f9 is described below
commit 79cabdc52f9a565e01e3c491ee0354d5f319d55f
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 23 18:09:14 2023 +0800
Refactor DataRecord.getKey() (#29515)
* Refactor DataRecord.getKey()
* Move DataRecordGroupEngine
* Move DataRecordGroupEngine
---
.../core/importer/sink/PipelineDataSourceSink.java | 4 ++--
.../pipeline/core/ingest/record/DataRecord.java | 19 +++++----------
.../record/group}/DataRecordGroupEngine.java | 11 +++------
.../record/{ => group}/GroupedDataRecord.java | 3 ++-
.../core/ingest/record/DataRecordTest.java | 28 ++++++++--------------
.../record/group}/DataRecordGroupEngineTest.java | 7 +++---
6 files changed, 26 insertions(+), 46 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index c59c7068194..842a28a8dfd 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -23,12 +23,12 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
-import
org.apache.shardingsphere.data.pipeline.core.importer.DataRecordGroupEngine;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.record.GroupedDataRecord;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
import org.apache.shardingsphere.data.pipeline.core.job.JobOperationType;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
index 5e02714a5c1..c347c8921e0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
@@ -22,9 +22,11 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -45,9 +47,9 @@ public final class DataRecord extends Record {
private final List<Column> columns;
- private final List<Object> uniqueKeyValue = new LinkedList<>();
+ private final Collection<Object> uniqueKeyValue = new LinkedList<>();
- private final List<Object> oldUniqueKeyValues = new ArrayList<>();
+ private final Collection<Object> oldUniqueKeyValues = new LinkedList<>();
private String actualTableName;
@@ -103,16 +105,7 @@ public final class DataRecord extends Record {
* @return key
*/
public Key getKey() {
- return new Key(tableName, uniqueKeyValue);
- }
-
- /**
- * Get old key.
- *
- * @return key
- */
- public Key getOldKey() {
- return new Key(tableName, oldUniqueKeyValues);
+ return IngestDataChangeType.DELETE.equals(type) ? new Key(tableName,
oldUniqueKeyValues) : new Key(tableName, uniqueKeyValue);
}
@RequiredArgsConstructor
@@ -121,6 +114,6 @@ public final class DataRecord extends Record {
private final String tableName;
- private final List<Object> uniqueKeyValues;
+ private final Collection<Object> uniqueKeyValues;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
similarity index 88%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngine.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index 0f084944d59..cfc2cefc70b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.importer;
+package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord.Key;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.record.GroupedDataRecord;
import java.util.Collection;
import java.util.Collections;
@@ -50,7 +49,7 @@ public final class DataRecordGroupEngine {
Map<String, Map<String, List<DataRecord>>> batchDataRecords = new
LinkedHashMap<>();
for (DataRecord each : records) {
tableNames.add(each.getTableName());
- if (duplicateKeys.getOrDefault(getKey(each), false)) {
+ if (duplicateKeys.getOrDefault(each.getKey(), false)) {
nonBatchRecords.computeIfAbsent(each.getTableName(), ignored
-> new LinkedList<>()).add(each);
} else {
batchDataRecords.computeIfAbsent(each.getTableName(), ignored
-> new HashMap<>()).computeIfAbsent(each.getType(), ignored -> new
LinkedList<>()).add(each);
@@ -63,16 +62,12 @@ public final class DataRecordGroupEngine {
private Map<Key, Boolean> getDuplicateKeys(final Collection<DataRecord>
records) {
Map<Key, Boolean> result = new HashMap<>();
for (DataRecord each : records) {
- Key key = getKey(each);
+ Key key = each.getKey();
result.put(key, result.containsKey(key));
}
return result;
}
- private Key getKey(final DataRecord record) {
- return IngestDataChangeType.DELETE.equals(record.getType()) ?
record.getOldKey() : record.getKey();
- }
-
private GroupedDataRecord getGroupedDataRecord(final String tableName,
final Map<String, List<DataRecord>> batchRecords, final List<DataRecord>
nonBatchRecords) {
return new GroupedDataRecord(tableName,
batchRecords.getOrDefault(IngestDataChangeType.INSERT, Collections.emptyList()),
batchRecords.getOrDefault(IngestDataChangeType.UPDATE,
Collections.emptyList()),
batchRecords.getOrDefault(IngestDataChangeType.DELETE,
Collections.emptyList()), nonBatchRecords);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/GroupedDataRecord.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
similarity index 93%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/GroupedDataRecord.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
index b3324db81ae..daa0e47c177 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/GroupedDataRecord.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.ingest.record;
+package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import java.util.List;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
index 003423f4430..cdea349b563 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
@@ -26,29 +26,21 @@ import static org.hamcrest.MatcherAssert.assertThat;
class DataRecordTest {
- private DataRecord beforeDataRecord;
-
- private DataRecord afterDataRecord;
-
@Test
- void assertKeyEqual() {
- beforeDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1",
new IngestPlaceholderPosition(), 2);
+ void assertGetKeyWithUpdate() {
+ DataRecord beforeDataRecord = new
DataRecord(IngestDataChangeType.UPDATE, "foo_tbl", new
IngestPlaceholderPosition(), 1);
beforeDataRecord.addColumn(new Column("id", 1, true, true));
- beforeDataRecord.addColumn(new Column("name", "1", true, false));
- afterDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1",
new IngestPlaceholderPosition(), 2);
- afterDataRecord.addColumn(new Column("id", 1, true, true));
- afterDataRecord.addColumn(new Column("name", "2", true, false));
+ DataRecord afterDataRecord = new
DataRecord(IngestDataChangeType.UPDATE, "foo_tbl", new
IngestPlaceholderPosition(), 1);
+ afterDataRecord.addColumn(new Column("id", 2, 1, true, true));
assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getKey()));
}
@Test
- void assertOldKeyEqual() {
- beforeDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1",
new IngestPlaceholderPosition(), 2);
- beforeDataRecord.addColumn(new Column("id", 1, true, true));
- beforeDataRecord.addColumn(new Column("name", "1", true, false));
- afterDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1",
new IngestPlaceholderPosition(), 2);
- afterDataRecord.addColumn(new Column("id", 1, 2, true, true));
- afterDataRecord.addColumn(new Column("name", "2", true, false));
- assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getOldKey()));
+ void assertGetKeyWithDelete() {
+ DataRecord beforeDataRecord = new
DataRecord(IngestDataChangeType.DELETE, "foo_tbl", new
IngestPlaceholderPosition(), 1);
+ beforeDataRecord.addColumn(new Column("id", 1, 2, true, true));
+ DataRecord afterDataRecord = new
DataRecord(IngestDataChangeType.DELETE, "foo_tbl", new
IngestPlaceholderPosition(), 1);
+ afterDataRecord.addColumn(new Column("id", 1, 3, true, true));
+ assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getKey()));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
similarity index 98%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngineTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
index 0be6fff94e1..ea14af84134 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
@@ -15,13 +15,12 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.importer;
+package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.record.GroupedDataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.junit.jupiter.api.Test;
import java.util.Arrays;