This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a8e57a0c55b Improve pipeline incremental task persist position (#29749)
a8e57a0c55b is described below
commit a8e57a0c55b765d27481a9d9de0165aa3a26506b
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Jan 17 15:08:01 2024 +0800
Improve pipeline incremental task persist position (#29749)
---
.../core/importer/SingleChannelConsumerImporter.java | 4 +---
.../ingest/client/netty/MySQLBinlogEventPacketDecoder.java | 2 +-
.../data/pipeline/cdc/core/importer/CDCImporter.java | 13 +++++++------
3 files changed, 9 insertions(+), 10 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
index fec3620b32e..09e74e55d80 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
@@ -22,14 +22,12 @@ import
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.util.List;
-import java.util.stream.Collectors;
/**
* Single channel consumer importer.
@@ -50,7 +48,7 @@ public final class SingleChannelConsumerImporter extends
AbstractPipelineLifecyc
@Override
protected void runBlocking() {
while (isRunning()) {
- List<Record> records = channel.fetch(batchSize,
timeoutMillis).stream().filter(each -> !(each instanceof
PlaceholderRecord)).collect(Collectors.toList());
+ List<Record> records = channel.fetch(batchSize, timeoutMillis);
if (records.isEmpty()) {
continue;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 4b3fff759b3..140660a8057 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -81,7 +81,7 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
return;
}
if (binlogEvent.get() instanceof PlaceholderEvent) {
- out.add(binlogEvent);
+ out.add(binlogEvent.get());
skipChecksum(binlogEventHeader.getEventType(), in);
return;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index ac84b7183d4..27addbe865f 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -27,15 +27,14 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
@@ -217,15 +216,17 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
private void doWithoutSorting(final CDCChannelProgressPair progressPair) {
PipelineChannel channel = progressPair.getChannel();
- List<Record> records = channel.fetch(batchSize,
timeoutMillis).stream().filter(each -> !(each instanceof
PlaceholderRecord)).collect(Collectors.toList());
+ List<Record> records = channel.fetch(batchSize, timeoutMillis);
if (records.isEmpty()) {
return;
}
Record lastRecord = records.get(records.size() - 1);
- if (lastRecord instanceof FinishedRecord &&
records.stream().noneMatch(DataRecord.class::isInstance)) {
+ if (records.stream().noneMatch(DataRecord.class::isInstance)) {
channel.ack(records);
progressPair.getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
- originalChannelProgressPairs.remove(progressPair);
+ if (lastRecord instanceof FinishedRecord) {
+ originalChannelProgressPairs.remove(progressPair);
+ }
return;
}
if (null != rateLimitAlgorithm) {