This is an automated email from the ASF dual-hosted git repository.
totalo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 98a7624a782 Refactor DataRecordGroupEngine (#29514)
98a7624a782 is described below
commit 98a7624a782a2c1ece4b483d6b8c7b9b7e50caea
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 23 17:30:41 2023 +0800
Refactor DataRecordGroupEngine (#29514)
---
.../core/importer/DataRecordGroupEngine.java | 80 +++++++++++++++++++
.../pipeline/core/importer/DataRecordMerger.java | 92 ----------------------
.../core/importer/sink/PipelineDataSourceSink.java | 9 ++-
.../pipeline/core/ingest/record/DataRecord.java | 2 +-
...gerTest.java => DataRecordGroupEngineTest.java} | 24 +++---
5 files changed, 97 insertions(+), 110 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngine.java
new file mode 100644
index 00000000000..0f084944d59
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngine.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.importer;
+
+import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord.Key;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.record.GroupedDataRecord;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Data Record group engine.
+ */
+public final class DataRecordGroupEngine {
+
+ /**
+ * Group by table and operation type.
+ *
+ * @param records data records
+ * @return grouped data records
+ */
+ public Collection<GroupedDataRecord> group(final Collection<DataRecord>
records) {
+ Map<Key, Boolean> duplicateKeys = getDuplicateKeys(records);
+ Collection<String> tableNames = new LinkedHashSet<>();
+ Map<String, List<DataRecord>> nonBatchRecords = new LinkedHashMap<>();
+ Map<String, Map<String, List<DataRecord>>> batchDataRecords = new
LinkedHashMap<>();
+ for (DataRecord each : records) {
+ tableNames.add(each.getTableName());
+ if (duplicateKeys.getOrDefault(getKey(each), false)) {
+ nonBatchRecords.computeIfAbsent(each.getTableName(), ignored
-> new LinkedList<>()).add(each);
+ } else {
+ batchDataRecords.computeIfAbsent(each.getTableName(), ignored
-> new HashMap<>()).computeIfAbsent(each.getType(), ignored -> new
LinkedList<>()).add(each);
+ }
+ }
+ return tableNames.stream().map(each -> getGroupedDataRecord(
+ each, batchDataRecords.getOrDefault(each,
Collections.emptyMap()), nonBatchRecords.getOrDefault(each,
Collections.emptyList()))).collect(Collectors.toList());
+ }
+
+ private Map<Key, Boolean> getDuplicateKeys(final Collection<DataRecord>
records) {
+ Map<Key, Boolean> result = new HashMap<>();
+ for (DataRecord each : records) {
+ Key key = getKey(each);
+ result.put(key, result.containsKey(key));
+ }
+ return result;
+ }
+
+ private Key getKey(final DataRecord record) {
+ return IngestDataChangeType.DELETE.equals(record.getType()) ?
record.getOldKey() : record.getKey();
+ }
+
+ private GroupedDataRecord getGroupedDataRecord(final String tableName,
final Map<String, List<DataRecord>> batchRecords, final List<DataRecord>
nonBatchRecords) {
+ return new GroupedDataRecord(tableName,
batchRecords.getOrDefault(IngestDataChangeType.INSERT, Collections.emptyList()),
+ batchRecords.getOrDefault(IngestDataChangeType.UPDATE,
Collections.emptyList()),
batchRecords.getOrDefault(IngestDataChangeType.DELETE,
Collections.emptyList()), nonBatchRecords);
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
deleted file mode 100644
index b9759f49fc1..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.importer;
-
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord.Key;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.record.GroupedDataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * Data Record merger.
- */
-public final class DataRecordMerger {
-
- /**
- * Group by table and type.
- *
- * @param dataRecords data records
- * @return grouped data records
- */
- public Collection<GroupedDataRecord> group(final Collection<DataRecord>
dataRecords) {
- int insertCount = 0;
- Map<Key, Boolean> duplicateKeyMap = new HashMap<>();
- Set<String> tableNames = new LinkedHashSet<>();
- for (DataRecord each : dataRecords) {
- if (IngestDataChangeType.INSERT.equals(each.getType())) {
- insertCount++;
- }
- tableNames.add(each.getTableName());
- Key key = getKeyFromDataRecord(each);
- duplicateKeyMap.put(key, duplicateKeyMap.containsKey(key));
- }
- List<GroupedDataRecord> result = new ArrayList<>(100);
- if (insertCount == dataRecords.size()) {
- Map<String, List<DataRecord>> tableGroup =
dataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
- for (Entry<String, List<DataRecord>> entry :
tableGroup.entrySet()) {
- result.add(new GroupedDataRecord(entry.getKey(),
entry.getValue(), Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()));
- }
- return result;
- }
- Map<String, List<DataRecord>> nonBatchRecords = new LinkedHashMap<>();
- Map<String, Map<String, List<DataRecord>>> batchDataRecords = new
LinkedHashMap<>();
- for (DataRecord each : dataRecords) {
- Key key = getKeyFromDataRecord(each);
- if (duplicateKeyMap.getOrDefault(key, false)) {
- nonBatchRecords.computeIfAbsent(each.getTableName(), ignored
-> new LinkedList<>()).add(each);
- continue;
- }
- Map<String, List<DataRecord>> recordMap =
batchDataRecords.computeIfAbsent(each.getTableName(), ignored -> new
HashMap<>());
- recordMap.computeIfAbsent(each.getType(), ignored -> new
LinkedList<>()).add(each);
- }
- for (String each : tableNames) {
- Map<String, List<DataRecord>> batchMap =
batchDataRecords.getOrDefault(each, Collections.emptyMap());
- List<DataRecord> nonBatchRecordMap =
nonBatchRecords.getOrDefault(each, Collections.emptyList());
- result.add(new GroupedDataRecord(each,
batchMap.getOrDefault(IngestDataChangeType.INSERT, Collections.emptyList()),
- batchMap.getOrDefault(IngestDataChangeType.UPDATE,
Collections.emptyList()), batchMap.getOrDefault(IngestDataChangeType.DELETE,
Collections.emptyList()), nonBatchRecordMap));
- }
- return result;
- }
-
- private Key getKeyFromDataRecord(final DataRecord dataRecord) {
- return IngestDataChangeType.DELETE.equals(dataRecord.getType()) ?
dataRecord.getOldKey() : dataRecord.getKey();
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 5b745667017..c59c7068194 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -23,7 +23,7 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
-import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
+import
org.apache.shardingsphere.data.pipeline.core.importer.DataRecordGroupEngine;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
@@ -55,8 +55,6 @@ import java.util.stream.Collectors;
@Slf4j
public final class PipelineDataSourceSink implements PipelineSink {
- private static final DataRecordMerger MERGER = new DataRecordMerger();
-
@Getter(AccessLevel.PROTECTED)
private final ImporterConfiguration importerConfig;
@@ -66,6 +64,8 @@ public final class PipelineDataSourceSink implements
PipelineSink {
private final PipelineImportSQLBuilder importSQLBuilder;
+ private final DataRecordGroupEngine groupEngine;
+
private final AtomicReference<Statement> batchInsertStatement = new
AtomicReference<>();
private final AtomicReference<Statement> updateStatement = new
AtomicReference<>();
@@ -77,6 +77,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
this.dataSourceManager = dataSourceManager;
rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
importSQLBuilder = new
PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
+ groupEngine = new DataRecordGroupEngine();
}
@Override
@@ -100,7 +101,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
insertRecordNumber++;
}
}
- Collection<GroupedDataRecord> groupedDataRecords =
MERGER.group(dataRecords);
+ Collection<GroupedDataRecord> groupedDataRecords =
groupEngine.group(dataRecords);
for (GroupedDataRecord each : groupedDataRecords) {
flushInternal(dataSource, each.getBatchDeleteDataRecords());
flushInternal(dataSource, each.getBatchInsertDataRecords());
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
index 6096c358e10..5e02714a5c1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
@@ -115,8 +115,8 @@ public final class DataRecord extends Record {
return new Key(tableName, oldUniqueKeyValues);
}
- @EqualsAndHashCode
@RequiredArgsConstructor
+ @EqualsAndHashCode
public static class Key {
private final String tableName;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngineTest.java
similarity index 88%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngineTest.java
index aa55aaccd6b..0be6fff94e1 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngineTest.java
@@ -33,15 +33,13 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
-class DataRecordMergerTest {
-
- private final DataRecordMerger dataRecordMerger = new DataRecordMerger();
+class DataRecordGroupEngineTest {
@Test
void assertDeleteBeforeInsert() {
DataRecord beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
DataRecord afterDataRecord = mockInsertDataRecord(1, 1, 1);
- Collection<GroupedDataRecord> actual =
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
}
@@ -50,7 +48,7 @@ class DataRecordMergerTest {
void assertInsertBeforeUpdate() {
DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
- Collection<GroupedDataRecord> actual =
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
}
@@ -59,7 +57,7 @@ class DataRecordMergerTest {
void assertInsertBeforeUpdatePrimaryKey() {
DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
DataRecord afterDataRecord = mockUpdateDataRecord(1, 1, 2, 2);
- Collection<GroupedDataRecord> actual =
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
}
@@ -68,7 +66,7 @@ class DataRecordMergerTest {
void assertUpdateBeforeUpdate() {
DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
- Collection<GroupedDataRecord> actual =
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
}
@@ -77,7 +75,7 @@ class DataRecordMergerTest {
void assertUpdateBeforeUpdatePrimaryKey() {
DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
- Collection<GroupedDataRecord> actual =
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertDataRecordsMatched(actual.iterator().next().getBatchUpdateDataRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
}
@@ -86,7 +84,7 @@ class DataRecordMergerTest {
void assertUpdatePrimaryKeyBeforeUpdatePrimaryKey() {
DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
DataRecord afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
- Collection<GroupedDataRecord> actual =
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
GroupedDataRecord actualGroupedDataRecord = actual.iterator().next();
assertThat(actualGroupedDataRecord.getBatchUpdateDataRecords().size(),
is(2));
@@ -110,7 +108,7 @@ class DataRecordMergerTest {
void assertInsertBeforeDelete() {
DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
- Collection<GroupedDataRecord> actual =
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
}
@@ -119,7 +117,7 @@ class DataRecordMergerTest {
void assertUpdateBeforeDelete() {
DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
- Collection<GroupedDataRecord> actual =
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
}
@@ -128,7 +126,7 @@ class DataRecordMergerTest {
void assertUpdatePrimaryKeyBeforeDelete() {
DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
DataRecord afterDataRecord = mockDeleteDataRecord(2, 1, 1);
- Collection<GroupedDataRecord> actual =
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+ Collection<GroupedDataRecord> actual = new
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
}
@@ -151,7 +149,7 @@ class DataRecordMergerTest {
mockInsertDataRecord("t1", 10, 10, 10),
mockDeleteDataRecord("t1", 3, 1, 1),
mockInsertDataRecord("t2", 1, 1, 1));
- Collection<GroupedDataRecord> groupedDataRecords =
dataRecordMerger.group(dataRecords);
+ Collection<GroupedDataRecord> groupedDataRecords = new
DataRecordGroupEngine().group(dataRecords);
assertThat(groupedDataRecords.size(), is(2));
Iterator<GroupedDataRecord> groupedDataRecordsIterator =
groupedDataRecords.iterator();
GroupedDataRecord actualGroupedDataRecord1 =
groupedDataRecordsIterator.next();