This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a8e57a0c55b Improve pipeline incremental task persist position (#29749)
a8e57a0c55b is described below

commit a8e57a0c55b765d27481a9d9de0165aa3a26506b
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Jan 17 15:08:01 2024 +0800

    Improve pipeline incremental task persist position (#29749)
---
 .../core/importer/SingleChannelConsumerImporter.java        |  4 +---
 .../ingest/client/netty/MySQLBinlogEventPacketDecoder.java  |  2 +-
 .../data/pipeline/cdc/core/importer/CDCImporter.java        | 13 +++++++------
 3 files changed, 9 insertions(+), 10 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
index fec3620b32e..09e74e55d80 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
@@ -22,14 +22,12 @@ import 
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * Single channel consumer importer.
@@ -50,7 +48,7 @@ public final class SingleChannelConsumerImporter extends 
AbstractPipelineLifecyc
     @Override
     protected void runBlocking() {
         while (isRunning()) {
-            List<Record> records = channel.fetch(batchSize, 
timeoutMillis).stream().filter(each -> !(each instanceof 
PlaceholderRecord)).collect(Collectors.toList());
+            List<Record> records = channel.fetch(batchSize, timeoutMillis);
             if (records.isEmpty()) {
                 continue;
             }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 4b3fff759b3..140660a8057 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -81,7 +81,7 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
                 return;
             }
             if (binlogEvent.get() instanceof PlaceholderEvent) {
-                out.add(binlogEvent);
+                out.add(binlogEvent.get());
                 skipChecksum(binlogEventHeader.getEventType(), in);
                 return;
             }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index ac84b7183d4..27addbe865f 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -27,15 +27,14 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
-import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 
@@ -217,15 +216,17 @@ public final class CDCImporter extends 
AbstractPipelineLifecycleRunnable impleme
     
     private void doWithoutSorting(final CDCChannelProgressPair progressPair) {
         PipelineChannel channel = progressPair.getChannel();
-        List<Record> records = channel.fetch(batchSize, 
timeoutMillis).stream().filter(each -> !(each instanceof 
PlaceholderRecord)).collect(Collectors.toList());
+        List<Record> records = channel.fetch(batchSize, timeoutMillis);
         if (records.isEmpty()) {
             return;
         }
         Record lastRecord = records.get(records.size() - 1);
-        if (lastRecord instanceof FinishedRecord && 
records.stream().noneMatch(DataRecord.class::isInstance)) {
+        if (records.stream().noneMatch(DataRecord.class::isInstance)) {
             channel.ack(records);
             progressPair.getJobProgressListener().onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
-            originalChannelProgressPairs.remove(progressPair);
+            if (lastRecord instanceof FinishedRecord) {
+                originalChannelProgressPairs.remove(progressPair);
+            }
             return;
         }
         if (null != rateLimitAlgorithm) {

Reply via email to