This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0f3807cd33e Refactor CDCImporter (#29650)
0f3807cd33e is described below
commit 0f3807cd33e1d17ac18d815dbb1d8a713466e2b0
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jan 4 16:22:12 2024 +0800
Refactor CDCImporter (#29650)
* Refactor CDCImporter
* Refactor CDCImporter
---
.../pipeline/cdc/core/importer/CDCImporter.java | 149 ++++++++++++---------
1 file changed, 83 insertions(+), 66 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index be3f1312b3a..ac84b7183d4 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -40,6 +40,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.Pipeli
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -91,51 +92,12 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
}
}
- private void doWithoutSorting() {
- for (CDCChannelProgressPair each : originalChannelProgressPairs) {
- PipelineChannel channel = each.getChannel();
- List<Record> records = channel.fetch(batchSize,
timeoutMillis).stream().filter(record -> !(record instanceof
PlaceholderRecord)).collect(Collectors.toList());
- if (records.isEmpty()) {
- continue;
- }
- Record lastRecord = records.get(records.size() - 1);
- if (lastRecord instanceof FinishedRecord &&
records.stream().noneMatch(DataRecord.class::isInstance)) {
- channel.ack(records);
- each.getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
- originalChannelProgressPairs.remove(each);
- continue;
- }
- if (null != rateLimitAlgorithm) {
- rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT,
1);
- }
- String ackId = CDCAckId.build(importerId).marshal();
- ackCache.put(ackId, Collections.singletonList(Pair.of(each, new
CDCAckPosition(records.get(records.size() - 1),
getDataRecordsCount(records)))));
- sink.write(ackId, records);
- }
- }
-
@SneakyThrows(InterruptedException.class)
private void doWithSorting() {
if (null != rateLimitAlgorithm) {
rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 1);
}
- CSNRecords firstCsnRecords = null;
- List<CSNRecords> csnRecordsList = new LinkedList<>();
- for (int i = 0, count = originalChannelProgressPairs.size(); i <
count; i++) {
- prepareTransactionRecords(originalChannelProgressPairs);
- CSNRecords csnRecords = csnRecordsQueue.peek();
- if (null == csnRecords) {
- continue;
- }
- if (null == firstCsnRecords) {
- csnRecords = csnRecordsQueue.poll();
- firstCsnRecords = csnRecords;
- csnRecordsList.add(csnRecords);
- } else if (csnRecords.getCsn() == firstCsnRecords.getCsn()) {
- csnRecords = csnRecordsQueue.poll();
- csnRecordsList.add(csnRecords);
- }
- }
+ List<CSNRecords> csnRecordsList = getCsnRecordsList();
if (csnRecordsList.isEmpty()) {
TimeUnit.MILLISECONDS.sleep(timeoutMillis);
return;
@@ -143,26 +105,31 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
// TODO Combine small transactions into a large transaction, to
improve transformation performance.
String ackId = CDCAckId.build(importerId).marshal();
if (1 == csnRecordsList.size()) {
- CSNRecords csnRecords = csnRecordsList.get(0);
- List<Record> records = csnRecords.getRecords();
- ackCache.put(ackId,
Collections.singletonList(Pair.of(csnRecords.getChannelProgressPair(), new
CDCAckPosition(records.get(records.size() - 1),
getDataRecordsCount(records)))));
- sink.write(ackId, filterDataRecords(records));
- return;
+ processCSNRecords(csnRecordsList.get(0), ackId);
+ } else {
+ processCSNRecordsList(csnRecordsList, ackId);
}
- List<Pair<CDCChannelProgressPair, CDCAckPosition>> ackValue =
csnRecordsList.stream().map(each -> Pair.of(each.getChannelProgressPair(),
- new
CDCAckPosition(each.getRecords().get(each.getRecords().size() - 1),
getDataRecordsCount(each.getRecords())))).collect(Collectors.toList());
- ackCache.put(ackId, ackValue);
- List<Record> records = new ArrayList<>(ackValue.stream().mapToInt(each
-> each.getRight().getDataRecordCount()).sum());
- csnRecordsList.forEach(each ->
records.addAll(filterDataRecords(each.getRecords())));
- sink.write(ackId, filterDataRecords(records));
}
- private int getDataRecordsCount(final List<Record> records) {
- return (int)
records.stream().filter(DataRecord.class::isInstance).count();
- }
-
- private List<Record> filterDataRecords(final List<Record> records) {
- return
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
+ private List<CSNRecords> getCsnRecordsList() {
+ List<CSNRecords> result = new LinkedList<>();
+ CSNRecords firstRecords = null;
+ for (int i = 0, count = originalChannelProgressPairs.size(); i <
count; i++) {
+ prepareTransactionRecords(originalChannelProgressPairs);
+ CSNRecords csnRecords = csnRecordsQueue.peek();
+ if (null == csnRecords) {
+ continue;
+ }
+ if (null == firstRecords) {
+ csnRecords = csnRecordsQueue.poll();
+ firstRecords = csnRecords;
+ result.add(csnRecords);
+ } else if (csnRecords.getCsn() == firstRecords.getCsn()) {
+ csnRecords = csnRecordsQueue.poll();
+ result.add(csnRecords);
+ }
+ }
+ return result;
}
// TODO openGauss CSN should be incremented for every transaction.
Currently, CSN might be duplicated in transactions.
@@ -190,15 +157,6 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
}
}
- private DataRecord findFirstDataRecord(final List<Record> records) {
- for (Record each : records) {
- if (each instanceof DataRecord) {
- return (DataRecord) each;
- }
- }
- throw new IllegalStateException("No data record found");
- }
-
private void prepareWhenQueueIsNotEmpty(final List<CDCChannelProgressPair>
channelProgressPairs, final long oldestCSN) {
for (CDCChannelProgressPair each : channelProgressPairs) {
PipelineChannel channel = each.getChannel();
@@ -219,6 +177,65 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
}
}
+ private int getDataRecordsCount(final List<Record> records) {
+ return (int)
records.stream().filter(DataRecord.class::isInstance).count();
+ }
+
+ private DataRecord findFirstDataRecord(final List<Record> records) {
+ for (Record each : records) {
+ if (each instanceof DataRecord) {
+ return (DataRecord) each;
+ }
+ }
+ throw new IllegalStateException("No data record found");
+ }
+
+ private void processCSNRecords(final CSNRecords csnRecords, final String
ackId) {
+ List<Record> records = csnRecords.getRecords();
+ ackCache.put(ackId,
Collections.singletonList(Pair.of(csnRecords.getChannelProgressPair(), new
CDCAckPosition(records.get(records.size() - 1),
getDataRecordsCount(records)))));
+ sink.write(ackId, filterDataRecords(records));
+ }
+
+ private void processCSNRecordsList(final List<CSNRecords> csnRecordsList,
final String ackId) {
+ List<Pair<CDCChannelProgressPair, CDCAckPosition>> ackValue =
csnRecordsList.stream().map(each -> Pair.of(each.getChannelProgressPair(),
+ new
CDCAckPosition(each.getRecords().get(each.getRecords().size() - 1),
getDataRecordsCount(each.getRecords())))).collect(Collectors.toList());
+ ackCache.put(ackId, ackValue);
+ Collection<Record> records = new
ArrayList<>(ackValue.stream().mapToInt(each ->
each.getRight().getDataRecordCount()).sum());
+ csnRecordsList.forEach(each ->
records.addAll(filterDataRecords(each.getRecords())));
+ sink.write(ackId, filterDataRecords(records));
+ }
+
+ private List<Record> filterDataRecords(final Collection<Record> records) {
+ return
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
+ }
+
+ private void doWithoutSorting() {
+ for (CDCChannelProgressPair each : originalChannelProgressPairs) {
+ doWithoutSorting(each);
+ }
+ }
+
+ private void doWithoutSorting(final CDCChannelProgressPair progressPair) {
+ PipelineChannel channel = progressPair.getChannel();
+ List<Record> records = channel.fetch(batchSize,
timeoutMillis).stream().filter(each -> !(each instanceof
PlaceholderRecord)).collect(Collectors.toList());
+ if (records.isEmpty()) {
+ return;
+ }
+ Record lastRecord = records.get(records.size() - 1);
+ if (lastRecord instanceof FinishedRecord &&
records.stream().noneMatch(DataRecord.class::isInstance)) {
+ channel.ack(records);
+ progressPair.getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
+ originalChannelProgressPairs.remove(progressPair);
+ return;
+ }
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 1);
+ }
+ String ackId = CDCAckId.build(importerId).marshal();
+ ackCache.put(ackId, Collections.singletonList(Pair.of(progressPair,
new CDCAckPosition(records.get(records.size() - 1),
getDataRecordsCount(records)))));
+ sink.write(ackId, records);
+ }
+
/**
* Ack.
*