This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f3807cd33e Refactor CDCImporter (#29650)
0f3807cd33e is described below

commit 0f3807cd33e1d17ac18d815dbb1d8a713466e2b0
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jan 4 16:22:12 2024 +0800

    Refactor CDCImporter (#29650)
    
    * Refactor CDCImporter
    
    * Refactor CDCImporter
---
 .../pipeline/cdc/core/importer/CDCImporter.java    | 149 ++++++++++++---------
 1 file changed, 83 insertions(+), 66 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index be3f1312b3a..ac84b7183d4 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -40,6 +40,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.Pipeli
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -91,51 +92,12 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
         }
     }
     
-    private void doWithoutSorting() {
-        for (CDCChannelProgressPair each : originalChannelProgressPairs) {
-            PipelineChannel channel = each.getChannel();
-            List<Record> records = channel.fetch(batchSize, 
timeoutMillis).stream().filter(record -> !(record instanceof 
PlaceholderRecord)).collect(Collectors.toList());
-            if (records.isEmpty()) {
-                continue;
-            }
-            Record lastRecord = records.get(records.size() - 1);
-            if (lastRecord instanceof FinishedRecord && 
records.stream().noneMatch(DataRecord.class::isInstance)) {
-                channel.ack(records);
-                each.getJobProgressListener().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
-                originalChannelProgressPairs.remove(each);
-                continue;
-            }
-            if (null != rateLimitAlgorithm) {
-                rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 
1);
-            }
-            String ackId = CDCAckId.build(importerId).marshal();
-            ackCache.put(ackId, Collections.singletonList(Pair.of(each, new 
CDCAckPosition(records.get(records.size() - 1), 
getDataRecordsCount(records)))));
-            sink.write(ackId, records);
-        }
-    }
-    
     @SneakyThrows(InterruptedException.class)
     private void doWithSorting() {
         if (null != rateLimitAlgorithm) {
             rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 1);
         }
-        CSNRecords firstCsnRecords = null;
-        List<CSNRecords> csnRecordsList = new LinkedList<>();
-        for (int i = 0, count = originalChannelProgressPairs.size(); i < 
count; i++) {
-            prepareTransactionRecords(originalChannelProgressPairs);
-            CSNRecords csnRecords = csnRecordsQueue.peek();
-            if (null == csnRecords) {
-                continue;
-            }
-            if (null == firstCsnRecords) {
-                csnRecords = csnRecordsQueue.poll();
-                firstCsnRecords = csnRecords;
-                csnRecordsList.add(csnRecords);
-            } else if (csnRecords.getCsn() == firstCsnRecords.getCsn()) {
-                csnRecords = csnRecordsQueue.poll();
-                csnRecordsList.add(csnRecords);
-            }
-        }
+        List<CSNRecords> csnRecordsList = getCsnRecordsList();
         if (csnRecordsList.isEmpty()) {
             TimeUnit.MILLISECONDS.sleep(timeoutMillis);
             return;
@@ -143,26 +105,31 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
         // TODO Combine small transactions into a large transaction, to 
improve transformation performance.
         String ackId = CDCAckId.build(importerId).marshal();
         if (1 == csnRecordsList.size()) {
-            CSNRecords csnRecords = csnRecordsList.get(0);
-            List<Record> records = csnRecords.getRecords();
-            ackCache.put(ackId, 
Collections.singletonList(Pair.of(csnRecords.getChannelProgressPair(), new 
CDCAckPosition(records.get(records.size() - 1), 
getDataRecordsCount(records)))));
-            sink.write(ackId, filterDataRecords(records));
-            return;
+            processCSNRecords(csnRecordsList.get(0), ackId);
+        } else {
+            processCSNRecordsList(csnRecordsList, ackId);
         }
-        List<Pair<CDCChannelProgressPair, CDCAckPosition>> ackValue = 
csnRecordsList.stream().map(each -> Pair.of(each.getChannelProgressPair(),
-                new 
CDCAckPosition(each.getRecords().get(each.getRecords().size() - 1), 
getDataRecordsCount(each.getRecords())))).collect(Collectors.toList());
-        ackCache.put(ackId, ackValue);
-        List<Record> records = new ArrayList<>(ackValue.stream().mapToInt(each 
-> each.getRight().getDataRecordCount()).sum());
-        csnRecordsList.forEach(each -> 
records.addAll(filterDataRecords(each.getRecords())));
-        sink.write(ackId, filterDataRecords(records));
     }
     
-    private int getDataRecordsCount(final List<Record> records) {
-        return (int) 
records.stream().filter(DataRecord.class::isInstance).count();
-    }
-    
-    private List<Record> filterDataRecords(final List<Record> records) {
-        return 
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
+    private List<CSNRecords> getCsnRecordsList() {
+        List<CSNRecords> result = new LinkedList<>();
+        CSNRecords firstRecords = null;
+        for (int i = 0, count = originalChannelProgressPairs.size(); i < 
count; i++) {
+            prepareTransactionRecords(originalChannelProgressPairs);
+            CSNRecords csnRecords = csnRecordsQueue.peek();
+            if (null == csnRecords) {
+                continue;
+            }
+            if (null == firstRecords) {
+                csnRecords = csnRecordsQueue.poll();
+                firstRecords = csnRecords;
+                result.add(csnRecords);
+            } else if (csnRecords.getCsn() == firstRecords.getCsn()) {
+                csnRecords = csnRecordsQueue.poll();
+                result.add(csnRecords);
+            }
+        }
+        return result;
     }
     
     // TODO openGauss CSN should be incremented for every transaction. 
Currently, CSN might be duplicated in transactions.
@@ -190,15 +157,6 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
         }
     }
     
-    private DataRecord findFirstDataRecord(final List<Record> records) {
-        for (Record each : records) {
-            if (each instanceof DataRecord) {
-                return (DataRecord) each;
-            }
-        }
-        throw new IllegalStateException("No data record found");
-    }
-    
     private void prepareWhenQueueIsNotEmpty(final List<CDCChannelProgressPair> 
channelProgressPairs, final long oldestCSN) {
         for (CDCChannelProgressPair each : channelProgressPairs) {
             PipelineChannel channel = each.getChannel();
@@ -219,6 +177,65 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
         }
     }
     
+    private int getDataRecordsCount(final List<Record> records) {
+        return (int) 
records.stream().filter(DataRecord.class::isInstance).count();
+    }
+    
+    private DataRecord findFirstDataRecord(final List<Record> records) {
+        for (Record each : records) {
+            if (each instanceof DataRecord) {
+                return (DataRecord) each;
+            }
+        }
+        throw new IllegalStateException("No data record found");
+    }
+    
+    private void processCSNRecords(final CSNRecords csnRecords, final String 
ackId) {
+        List<Record> records = csnRecords.getRecords();
+        ackCache.put(ackId, 
Collections.singletonList(Pair.of(csnRecords.getChannelProgressPair(), new 
CDCAckPosition(records.get(records.size() - 1), 
getDataRecordsCount(records)))));
+        sink.write(ackId, filterDataRecords(records));
+    }
+    
+    private void processCSNRecordsList(final List<CSNRecords> csnRecordsList, 
final String ackId) {
+        List<Pair<CDCChannelProgressPair, CDCAckPosition>> ackValue = 
csnRecordsList.stream().map(each -> Pair.of(each.getChannelProgressPair(),
+                new 
CDCAckPosition(each.getRecords().get(each.getRecords().size() - 1), 
getDataRecordsCount(each.getRecords())))).collect(Collectors.toList());
+        ackCache.put(ackId, ackValue);
+        Collection<Record> records = new 
ArrayList<>(ackValue.stream().mapToInt(each -> 
each.getRight().getDataRecordCount()).sum());
+        csnRecordsList.forEach(each -> 
records.addAll(filterDataRecords(each.getRecords())));
+        sink.write(ackId, filterDataRecords(records));
+    }
+    
+    private List<Record> filterDataRecords(final Collection<Record> records) {
+        return 
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
+    }
+    
+    private void doWithoutSorting() {
+        for (CDCChannelProgressPair each : originalChannelProgressPairs) {
+            doWithoutSorting(each);
+        }
+    }
+    
+    private void doWithoutSorting(final CDCChannelProgressPair progressPair) {
+        PipelineChannel channel = progressPair.getChannel();
+        List<Record> records = channel.fetch(batchSize, 
timeoutMillis).stream().filter(each -> !(each instanceof 
PlaceholderRecord)).collect(Collectors.toList());
+        if (records.isEmpty()) {
+            return;
+        }
+        Record lastRecord = records.get(records.size() - 1);
+        if (lastRecord instanceof FinishedRecord && 
records.stream().noneMatch(DataRecord.class::isInstance)) {
+            channel.ack(records);
+            progressPair.getJobProgressListener().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
+            originalChannelProgressPairs.remove(progressPair);
+            return;
+        }
+        if (null != rateLimitAlgorithm) {
+            rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 1);
+        }
+        String ackId = CDCAckId.build(importerId).marshal();
+        ackCache.put(ackId, Collections.singletonList(Pair.of(progressPair, 
new CDCAckPosition(records.get(records.size() - 1), 
getDataRecordsCount(records)))));
+        sink.write(ackId, records);
+    }
+    
     /**
      * Ack.
      *

Reply via email to