This is an automated email from the ASF dual-hosted git repository.

totalo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 98a7624a782 Refactor DataRecordGroupEngine (#29514)
98a7624a782 is described below

commit 98a7624a782a2c1ece4b483d6b8c7b9b7e50caea
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 23 17:30:41 2023 +0800

    Refactor DataRecordGroupEngine (#29514)
---
 .../core/importer/DataRecordGroupEngine.java       | 80 +++++++++++++++++++
 .../pipeline/core/importer/DataRecordMerger.java   | 92 ----------------------
 .../core/importer/sink/PipelineDataSourceSink.java |  9 ++-
 .../pipeline/core/ingest/record/DataRecord.java    |  2 +-
 ...gerTest.java => DataRecordGroupEngineTest.java} | 24 +++---
 5 files changed, 97 insertions(+), 110 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngine.java
new file mode 100644
index 00000000000..0f084944d59
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngine.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.importer;
+
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord.Key;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.GroupedDataRecord;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Data Record group engine.
+ */
+public final class DataRecordGroupEngine {
+    
+    /**
+     * Group by table and operation type.
+     *
+     * @param records data records
+     * @return grouped data records
+     */
+    public Collection<GroupedDataRecord> group(final Collection<DataRecord> 
records) {
+        Map<Key, Boolean> duplicateKeys = getDuplicateKeys(records);
+        Collection<String> tableNames = new LinkedHashSet<>();
+        Map<String, List<DataRecord>> nonBatchRecords = new LinkedHashMap<>();
+        Map<String, Map<String, List<DataRecord>>> batchDataRecords = new 
LinkedHashMap<>();
+        for (DataRecord each : records) {
+            tableNames.add(each.getTableName());
+            if (duplicateKeys.getOrDefault(getKey(each), false)) {
+                nonBatchRecords.computeIfAbsent(each.getTableName(), ignored 
-> new LinkedList<>()).add(each);
+            } else {
+                batchDataRecords.computeIfAbsent(each.getTableName(), ignored 
-> new HashMap<>()).computeIfAbsent(each.getType(), ignored -> new 
LinkedList<>()).add(each);
+            }
+        }
+        return tableNames.stream().map(each -> getGroupedDataRecord(
+                each, batchDataRecords.getOrDefault(each, 
Collections.emptyMap()), nonBatchRecords.getOrDefault(each, 
Collections.emptyList()))).collect(Collectors.toList());
+    }
+    
+    private Map<Key, Boolean> getDuplicateKeys(final Collection<DataRecord> 
records) {
+        Map<Key, Boolean> result = new HashMap<>();
+        for (DataRecord each : records) {
+            Key key = getKey(each);
+            result.put(key, result.containsKey(key));
+        }
+        return result;
+    }
+    
+    private Key getKey(final DataRecord record) {
+        return IngestDataChangeType.DELETE.equals(record.getType()) ? 
record.getOldKey() : record.getKey();
+    }
+    
+    private GroupedDataRecord getGroupedDataRecord(final String tableName, 
final Map<String, List<DataRecord>> batchRecords, final List<DataRecord> 
nonBatchRecords) {
+        return new GroupedDataRecord(tableName, 
batchRecords.getOrDefault(IngestDataChangeType.INSERT, Collections.emptyList()),
+                batchRecords.getOrDefault(IngestDataChangeType.UPDATE, 
Collections.emptyList()), 
batchRecords.getOrDefault(IngestDataChangeType.DELETE, 
Collections.emptyList()), nonBatchRecords);
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
deleted file mode 100644
index b9759f49fc1..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.importer;
-
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord.Key;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.GroupedDataRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * Data Record merger.
- */
-public final class DataRecordMerger {
-    
-    /**
-     * Group by table and type.
-     *
-     * @param dataRecords data records
-     * @return grouped data records
-     */
-    public Collection<GroupedDataRecord> group(final Collection<DataRecord> 
dataRecords) {
-        int insertCount = 0;
-        Map<Key, Boolean> duplicateKeyMap = new HashMap<>();
-        Set<String> tableNames = new LinkedHashSet<>();
-        for (DataRecord each : dataRecords) {
-            if (IngestDataChangeType.INSERT.equals(each.getType())) {
-                insertCount++;
-            }
-            tableNames.add(each.getTableName());
-            Key key = getKeyFromDataRecord(each);
-            duplicateKeyMap.put(key, duplicateKeyMap.containsKey(key));
-        }
-        List<GroupedDataRecord> result = new ArrayList<>(100);
-        if (insertCount == dataRecords.size()) {
-            Map<String, List<DataRecord>> tableGroup = 
dataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
-            for (Entry<String, List<DataRecord>> entry : 
tableGroup.entrySet()) {
-                result.add(new GroupedDataRecord(entry.getKey(), 
entry.getValue(), Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()));
-            }
-            return result;
-        }
-        Map<String, List<DataRecord>> nonBatchRecords = new LinkedHashMap<>();
-        Map<String, Map<String, List<DataRecord>>> batchDataRecords = new 
LinkedHashMap<>();
-        for (DataRecord each : dataRecords) {
-            Key key = getKeyFromDataRecord(each);
-            if (duplicateKeyMap.getOrDefault(key, false)) {
-                nonBatchRecords.computeIfAbsent(each.getTableName(), ignored 
-> new LinkedList<>()).add(each);
-                continue;
-            }
-            Map<String, List<DataRecord>> recordMap = 
batchDataRecords.computeIfAbsent(each.getTableName(), ignored -> new 
HashMap<>());
-            recordMap.computeIfAbsent(each.getType(), ignored -> new 
LinkedList<>()).add(each);
-        }
-        for (String each : tableNames) {
-            Map<String, List<DataRecord>> batchMap = 
batchDataRecords.getOrDefault(each, Collections.emptyMap());
-            List<DataRecord> nonBatchRecordMap = 
nonBatchRecords.getOrDefault(each, Collections.emptyList());
-            result.add(new GroupedDataRecord(each, 
batchMap.getOrDefault(IngestDataChangeType.INSERT, Collections.emptyList()),
-                    batchMap.getOrDefault(IngestDataChangeType.UPDATE, 
Collections.emptyList()), batchMap.getOrDefault(IngestDataChangeType.DELETE, 
Collections.emptyList()), nonBatchRecordMap));
-        }
-        return result;
-    }
-    
-    private Key getKeyFromDataRecord(final DataRecord dataRecord) {
-        return IngestDataChangeType.DELETE.equals(dataRecord.getType()) ? 
dataRecord.getOldKey() : dataRecord.getKey();
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 5b745667017..c59c7068194 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -23,7 +23,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
-import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.DataRecordGroupEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
@@ -55,8 +55,6 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class PipelineDataSourceSink implements PipelineSink {
     
-    private static final DataRecordMerger MERGER = new DataRecordMerger();
-    
     @Getter(AccessLevel.PROTECTED)
     private final ImporterConfiguration importerConfig;
     
@@ -66,6 +64,8 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     private final PipelineImportSQLBuilder importSQLBuilder;
     
+    private final DataRecordGroupEngine groupEngine;
+    
     private final AtomicReference<Statement> batchInsertStatement = new 
AtomicReference<>();
     
     private final AtomicReference<Statement> updateStatement = new 
AtomicReference<>();
@@ -77,6 +77,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         this.dataSourceManager = dataSourceManager;
         rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
         importSQLBuilder = new 
PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
+        groupEngine = new DataRecordGroupEngine();
     }
     
     @Override
@@ -100,7 +101,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
                 insertRecordNumber++;
             }
         }
-        Collection<GroupedDataRecord> groupedDataRecords = 
MERGER.group(dataRecords);
+        Collection<GroupedDataRecord> groupedDataRecords = 
groupEngine.group(dataRecords);
         for (GroupedDataRecord each : groupedDataRecords) {
             flushInternal(dataSource, each.getBatchDeleteDataRecords());
             flushInternal(dataSource, each.getBatchInsertDataRecords());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
index 6096c358e10..5e02714a5c1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
@@ -115,8 +115,8 @@ public final class DataRecord extends Record {
         return new Key(tableName, oldUniqueKeyValues);
     }
     
-    @EqualsAndHashCode
     @RequiredArgsConstructor
+    @EqualsAndHashCode
     public static class Key {
         
         private final String tableName;
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngineTest.java
similarity index 88%
rename from 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
rename to 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngineTest.java
index aa55aaccd6b..0be6fff94e1 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordGroupEngineTest.java
@@ -33,15 +33,13 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.sameInstance;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-class DataRecordMergerTest {
-    
-    private final DataRecordMerger dataRecordMerger = new DataRecordMerger();
+class DataRecordGroupEngineTest {
     
     @Test
     void assertDeleteBeforeInsert() {
         DataRecord beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
         DataRecord afterDataRecord = mockInsertDataRecord(1, 1, 1);
-        Collection<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
@@ -50,7 +48,7 @@ class DataRecordMergerTest {
     void assertInsertBeforeUpdate() {
         DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
         DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
-        Collection<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
@@ -59,7 +57,7 @@ class DataRecordMergerTest {
     void assertInsertBeforeUpdatePrimaryKey() {
         DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
         DataRecord afterDataRecord = mockUpdateDataRecord(1, 1, 2, 2);
-        Collection<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
@@ -68,7 +66,7 @@ class DataRecordMergerTest {
     void assertUpdateBeforeUpdate() {
         DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
         DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
-        Collection<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
@@ -77,7 +75,7 @@ class DataRecordMergerTest {
     void assertUpdateBeforeUpdatePrimaryKey() {
         DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
         DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
-        Collection<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         
assertDataRecordsMatched(actual.iterator().next().getBatchUpdateDataRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
@@ -86,7 +84,7 @@ class DataRecordMergerTest {
     void assertUpdatePrimaryKeyBeforeUpdatePrimaryKey() {
         DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
         DataRecord afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
-        Collection<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         GroupedDataRecord actualGroupedDataRecord = actual.iterator().next();
         assertThat(actualGroupedDataRecord.getBatchUpdateDataRecords().size(), 
is(2));
@@ -110,7 +108,7 @@ class DataRecordMergerTest {
     void assertInsertBeforeDelete() {
         DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
         DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
-        Collection<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
@@ -119,7 +117,7 @@ class DataRecordMergerTest {
     void assertUpdateBeforeDelete() {
         DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
         DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
-        Collection<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
@@ -128,7 +126,7 @@ class DataRecordMergerTest {
     void assertUpdatePrimaryKeyBeforeDelete() {
         DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
         DataRecord afterDataRecord = mockDeleteDataRecord(2, 1, 1);
-        Collection<GroupedDataRecord> actual = 
dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
+        Collection<GroupedDataRecord> actual = new 
DataRecordGroupEngine().group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
         
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
@@ -151,7 +149,7 @@ class DataRecordMergerTest {
                 mockInsertDataRecord("t1", 10, 10, 10),
                 mockDeleteDataRecord("t1", 3, 1, 1),
                 mockInsertDataRecord("t2", 1, 1, 1));
-        Collection<GroupedDataRecord> groupedDataRecords = 
dataRecordMerger.group(dataRecords);
+        Collection<GroupedDataRecord> groupedDataRecords = new 
DataRecordGroupEngine().group(dataRecords);
         assertThat(groupedDataRecords.size(), is(2));
         Iterator<GroupedDataRecord> groupedDataRecordsIterator = 
groupedDataRecords.iterator();
         GroupedDataRecord actualGroupedDataRecord1 = 
groupedDataRecordsIterator.next();

Reply via email to