This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dd6caf4801 Refactor pipeline channel, read records by transaction at 
increment task (#25787)
7dd6caf4801 is described below

commit 7dd6caf48018ef390780545b648b13f251464673
Author: Xinze Guo <[email protected]>
AuthorDate: Tue May 23 20:35:18 2023 +0800

    Refactor pipeline channel, read records by transaction at increment task 
(#25787)
    
    * Refactor pipeline channel,  read record by transaction
    
    * Add MySQL decode tx switch
    
    * Refactor importers to importer
    
    * Simple code and rollback MultiplexMemoryPipelineChannel
    
    * Improve MultiplexMemoryPipelineChannel and fix codestyle
    
    * Improve comment refer url
---
 .../api/ingest/channel/PipelineChannel.java        |  4 +-
 .../connector/SocketSinkImporterConnector.java     | 23 +++++--
 .../cdc/core/importer/SocketSinkImporter.java      |  2 +-
 .../data/pipeline/cdc/util/CDCDataRecordUtils.java | 59 ++++++++--------
 .../yaml/job/YamlCDCJobConfigurationSwapper.java   |  2 +-
 .../pipeline/cdc/util/CDCDataRecordUtilsTest.java  | 30 ++++----
 .../pipeline/core/importer/DataSourceImporter.java | 29 ++++----
 .../memory/MemoryPipelineChannelCreator.java       |  5 +-
 .../memory/MultiplexMemoryPipelineChannel.java     | 34 +++++++---
 .../memory/SimpleMemoryPipelineChannel.java        | 22 +++---
 .../core/ingest/dumper/InventoryDumper.java        | 14 +++-
 .../data/pipeline/core/record/RecordUtils.java     | 19 ------
 .../data/pipeline/core/task/IncrementalTask.java   |  2 +-
 .../data/pipeline/core/task/InventoryTask.java     | 13 ++--
 .../spi/ingest/channel/PipelineChannelCreator.java |  3 +-
 .../memory/MemoryPipelineChannelCreatorTest.java   |  4 +-
 .../memory/MultiplexMemoryPipelineChannelTest.java |  4 +-
 .../memory/SimpleMemoryPipelineChannelTest.java    | 41 +++++++++++
 .../data/pipeline/core/record/RecordUtilsTest.java | 17 -----
 .../mysql/ingest/MySQLIncrementalDumper.java       | 79 +++++++++++++++-------
 .../mysql/ingest/binlog/event/QueryEvent.java}     | 35 ++++++----
 .../mysql/ingest/binlog/event/XidEvent.java}       | 24 +++----
 .../pipeline/mysql/ingest/client/MySQLClient.java  | 30 ++++++--
 .../netty/MySQLBinlogEventPacketDecoder.java       | 77 ++++++++++++++++++++-
 .../mysql/ingest/MySQLIncrementalDumperTest.java   | 45 ++++++------
 .../mysql/ingest/client/MySQLClientTest.java       |  2 +-
 .../netty/MySQLBinlogEventPacketDecoderTest.java   | 26 ++++---
 .../opengauss/ingest/OpenGaussWALDumper.java       | 20 +++---
 .../postgresql/ingest/PostgreSQLWALDumper.java     | 45 +++++++++++-
 .../ingest/wal/decode/TestDecodingPlugin.java      | 11 ++-
 .../postgresql/ingest/PostgreSQLWALDumperTest.java |  6 +-
 .../core/importer/DataSourceImporterTest.java      |  3 +-
 32 files changed, 476 insertions(+), 254 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
index 5fa2b8ce353..25ce5a44a59 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
@@ -30,9 +30,9 @@ public interface PipelineChannel {
     /**
      * Push {@code DataRecord} into channel.
      *
-     * @param dataRecord data
+     * @param dataRecords data records
      */
-    void pushRecord(Record dataRecord);
+    void pushRecords(List<Record> dataRecords);
     
     /**
      * Fetch {@code Record} list from channel.
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
index b326565741e..905adc5fef8 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
@@ -34,13 +34,13 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerato
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataRecordUtils;
 import 
org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils;
-import org.apache.shardingsphere.data.pipeline.core.record.RecordUtils;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -78,7 +78,7 @@ public final class SocketSinkImporterConnector implements 
ImporterConnector, Aut
     
     private final Map<String, String> tableNameSchemaMap = new HashMap<>();
     
-    private final Map<SocketSinkImporter, BlockingQueue<Record>> 
incrementalRecordMap = new ConcurrentHashMap<>();
+    private final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> 
incrementalRecordMap = new ConcurrentHashMap<>();
     
     private final AtomicInteger runningIncrementalTaskCount = new 
AtomicInteger(0);
     
@@ -120,7 +120,7 @@ public final class SocketSinkImporterConnector implements 
ImporterConnector, Aut
                 return;
             }
             Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = 
new HashMap<>();
-            importerDataRecordMap.put(socketSinkImporter, new 
CDCAckPosition(RecordUtils.getLastNormalRecord(recordList), dataRecordCount));
+            importerDataRecordMap.put(socketSinkImporter, new 
CDCAckPosition(lastRecord, dataRecordCount));
             writeImmediately(recordList, importerDataRecordMap);
         } else if (ImporterType.INCREMENTAL == importerType) {
             writeIntoQueue(recordList, socketSinkImporter);
@@ -160,12 +160,21 @@ public final class SocketSinkImporterConnector implements 
ImporterConnector, Aut
     
     @SneakyThrows(InterruptedException.class)
     private void writeIntoQueue(final List<Record> dataRecords, final 
SocketSinkImporter socketSinkImporter) {
-        BlockingQueue<Record> blockingQueue = 
incrementalRecordMap.get(socketSinkImporter);
+        BlockingQueue<List<DataRecord>> blockingQueue = 
incrementalRecordMap.get(socketSinkImporter);
         if (null == blockingQueue) {
             log.warn("not find the queue to write");
             return;
         }
+        Map<Long, List<DataRecord>> recordsMap = new LinkedHashMap<>();
         for (Record each : dataRecords) {
+            if (!(each instanceof DataRecord)) {
+                continue;
+            }
+            DataRecord dataRecord = (DataRecord) each;
+            // TODO need improve if support global transaction
+            recordsMap.computeIfAbsent(dataRecord.getCsn(), ignored -> new 
LinkedList<>()).add(dataRecord);
+        }
+        for (List<DataRecord> each : recordsMap.values()) {
             blockingQueue.put(each);
         }
     }
@@ -223,11 +232,11 @@ public final class SocketSinkImporterConnector implements 
ImporterConnector, Aut
                 Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap = 
new HashMap<>();
                 List<DataRecord> dataRecords = new LinkedList<>();
                 for (int i = 0; i < batchSize; i++) {
-                    DataRecord minimumDataRecord = 
CDCDataRecordUtils.findMinimumDataRecordAndSavePosition(incrementalRecordMap, 
dataRecordComparator, cdcAckPositionMap);
-                    if (null == minimumDataRecord) {
+                    List<DataRecord> minimumRecords = 
CDCDataRecordUtils.findMinimumDataRecordsAndSavePosition(incrementalRecordMap, 
dataRecordComparator, cdcAckPositionMap);
+                    if (minimumRecords.isEmpty()) {
                         break;
                     }
-                    dataRecords.add(minimumDataRecord);
+                    dataRecords.addAll(minimumRecords);
                 }
                 if (dataRecords.isEmpty()) {
                     Thread.sleep(200L);
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
index 691b5800028..a6eb2942911 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
@@ -80,7 +80,7 @@ public final class SocketSinkImporter extends 
AbstractLifecycleExecutor implemen
         }
         while (isRunning()) {
             List<Record> records = channel.fetchRecords(batchSize, 500, 
TimeUnit.MILLISECONDS);
-            if (null != records && !records.isEmpty()) {
+            if (!records.isEmpty()) {
                 List<Record> recordList = records.stream().filter(each -> 
!(each instanceof PlaceholderRecord)).collect(Collectors.toList());
                 processDataRecords(recordList);
                 if (FinishedRecord.class.equals(records.get(records.size() - 
1).getClass())) {
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java
index 2a65306008a..6189f2ba161 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java
@@ -24,8 +24,10 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
 
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
@@ -44,8 +46,8 @@ public final class CDCDataRecordUtils {
      * @param cdcAckPositionMap CDC ack position map.
      * @return minimum data record
      */
-    public static DataRecord findMinimumDataRecordAndSavePosition(final 
Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap, final 
Comparator<DataRecord> dataRecordComparator,
-                                                                  final 
Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
+    public static List<DataRecord> findMinimumDataRecordsAndSavePosition(final 
Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap,
+                                                                         final 
Comparator<DataRecord> dataRecordComparator, final Map<SocketSinkImporter, 
CDCAckPosition> cdcAckPositionMap) {
         if (null == dataRecordComparator) {
             return 
findMinimumDataRecordWithoutComparator(incrementalRecordMap, cdcAckPositionMap);
         } else {
@@ -53,17 +55,18 @@ public final class CDCDataRecordUtils {
         }
     }
     
-    private static DataRecord findMinimumDataRecordWithoutComparator(final 
Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap,
-                                                                     final 
Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
-        for (Entry<SocketSinkImporter, BlockingQueue<Record>> entry : 
incrementalRecordMap.entrySet()) {
-            Record record = entry.getValue().poll();
-            if (!(record instanceof DataRecord)) {
+    private static List<DataRecord> 
findMinimumDataRecordWithoutComparator(final Map<SocketSinkImporter, 
BlockingQueue<List<DataRecord>>> incrementalRecordMap,
+                                                                           
final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
+        for (Entry<SocketSinkImporter, BlockingQueue<List<DataRecord>>> entry 
: incrementalRecordMap.entrySet()) {
+            List<DataRecord> records = entry.getValue().poll();
+            if (null == records || records.isEmpty()) {
                 continue;
             }
-            saveAckPosition(cdcAckPositionMap, entry.getKey(), record);
-            return (DataRecord) record;
+            DataRecord lastRecord = records.get(records.size() - 1);
+            saveAckPosition(cdcAckPositionMap, entry.getKey(), lastRecord);
+            return records;
         }
-        return null;
+        return Collections.emptyList();
     }
     
     private static void saveAckPosition(final Map<SocketSinkImporter, 
CDCAckPosition> cdcAckPositionMap, final SocketSinkImporter socketSinkImporter, 
final Record record) {
@@ -76,39 +79,37 @@ public final class CDCDataRecordUtils {
         }
     }
     
-    private static DataRecord findMinimumDataRecordWithComparator(final 
Map<SocketSinkImporter, BlockingQueue<Record>> incrementalRecordMap,
-                                                                  final 
Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final 
Comparator<DataRecord> dataRecordComparator) {
-        Map<SocketSinkImporter, DataRecord> waitSortedMap = new HashMap<>();
-        for (Entry<SocketSinkImporter, BlockingQueue<Record>> entry : 
incrementalRecordMap.entrySet()) {
-            Record peek = entry.getValue().peek();
+    private static List<DataRecord> findMinimumDataRecordWithComparator(final 
Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap,
+                                                                        final 
Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final 
Comparator<DataRecord> dataRecordComparator) {
+        Map<SocketSinkImporter, List<DataRecord>> waitSortedMap = new 
HashMap<>();
+        for (Entry<SocketSinkImporter, BlockingQueue<List<DataRecord>>> entry 
: incrementalRecordMap.entrySet()) {
+            List<DataRecord> peek = entry.getValue().peek();
             if (null == peek) {
                 continue;
             }
-            if (peek instanceof DataRecord) {
-                waitSortedMap.put(entry.getKey(), (DataRecord) peek);
-            }
+            waitSortedMap.put(entry.getKey(), peek);
         }
         if (waitSortedMap.isEmpty()) {
-            return null;
+            return Collections.emptyList();
         }
-        DataRecord minRecord = null;
+        List<DataRecord> result = null;
         SocketSinkImporter belongImporter = null;
-        for (Entry<SocketSinkImporter, DataRecord> entry : 
waitSortedMap.entrySet()) {
-            if (null == minRecord) {
-                minRecord = entry.getValue();
+        for (Entry<SocketSinkImporter, List<DataRecord>> entry : 
waitSortedMap.entrySet()) {
+            if (null == result) {
+                result = entry.getValue();
                 belongImporter = entry.getKey();
                 continue;
             }
-            if (dataRecordComparator.compare(minRecord, entry.getValue()) > 0) 
{
-                minRecord = entry.getValue();
+            if (dataRecordComparator.compare(result.get(0), 
entry.getValue().get(0)) > 0) {
+                result = entry.getValue();
                 belongImporter = entry.getKey();
             }
         }
-        if (null == minRecord) {
-            return null;
+        if (null == result) {
+            return Collections.emptyList();
         }
         incrementalRecordMap.get(belongImporter).poll();
-        saveAckPosition(cdcAckPositionMap, belongImporter, minRecord);
-        return minRecord;
+        saveAckPosition(cdcAckPositionMap, belongImporter, 
result.get(result.size() - 1));
+        return result;
     }
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
index 8041712e616..979a583388d 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
@@ -53,7 +53,7 @@ public final class YamlCDCJobConfigurationSwapper implements 
YamlConfigurationSw
         result.setDecodeWithTX(data.isDecodeWithTX());
         
result.setSinkConfig(swapToYamlSinkConfiguration(data.getSinkConfig()));
         result.setConcurrency(data.getConcurrency());
-        result.setRetryTimes(0);
+        result.setRetryTimes(data.getRetryTimes());
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java
index 575d221de2e..514c948b6fb 100644
--- 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java
@@ -19,48 +19,50 @@ package org.apache.shardingsphere.data.pipeline.cdc.util;
 
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
 import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
 class CDCDataRecordUtilsTest {
     
     @Test
     void assertFindMinimumDataRecordAndSavePosition() throws 
InterruptedException {
-        final Map<SocketSinkImporter, BlockingQueue<Record>> 
actualIncrementalRecordMap = new HashMap<>();
-        ArrayBlockingQueue<Record> queueFirst = new ArrayBlockingQueue<>(5);
-        queueFirst.put(generateDataRecord(0));
-        queueFirst.put(generateDataRecord(2));
-        queueFirst.put(generateDataRecord(4));
+        final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> 
actualIncrementalRecordMap = new HashMap<>();
+        ArrayBlockingQueue<List<DataRecord>> queueFirst = new 
ArrayBlockingQueue<>(5);
+        queueFirst.put(Collections.singletonList(generateDataRecord(0)));
+        queueFirst.put(Collections.singletonList(generateDataRecord(2)));
+        queueFirst.put(Collections.singletonList(generateDataRecord(4)));
         SocketSinkImporter mockSocketSinkImporterFirst = 
mock(SocketSinkImporter.class);
         actualIncrementalRecordMap.put(mockSocketSinkImporterFirst, 
queueFirst);
-        ArrayBlockingQueue<Record> queueSecond = new ArrayBlockingQueue<>(5);
-        queueSecond.put(generateDataRecord(1));
-        queueSecond.put(generateDataRecord(3));
-        queueSecond.put(generateDataRecord(5));
+        ArrayBlockingQueue<List<DataRecord>> queueSecond = new 
ArrayBlockingQueue<>(5);
+        queueSecond.put(Collections.singletonList(generateDataRecord(1)));
+        queueSecond.put(Collections.singletonList(generateDataRecord(3)));
+        queueSecond.put(Collections.singletonList(generateDataRecord(5)));
         SocketSinkImporter mockSocketSinkImporterSecond = 
mock(SocketSinkImporter.class);
         actualIncrementalRecordMap.put(mockSocketSinkImporterSecond, 
queueSecond);
         Comparator<DataRecord> dataRecordComparator = 
DataRecordComparatorGenerator.generatorIncrementalComparator(new 
OpenGaussDatabaseType());
         final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap = new 
HashMap<>();
         for (long i = 0; i <= 5; i++) {
-            DataRecord minimumDataRecord = 
CDCDataRecordUtils.findMinimumDataRecordAndSavePosition(actualIncrementalRecordMap,
 dataRecordComparator, cdcAckPositionMap);
-            assertThat(minimumDataRecord.getCsn(), is(i));
+            List<DataRecord> minimumDataRecord = 
CDCDataRecordUtils.findMinimumDataRecordsAndSavePosition(actualIncrementalRecordMap,
 dataRecordComparator, cdcAckPositionMap);
+            assertThat(minimumDataRecord.size(), is(1));
+            assertThat(minimumDataRecord.get(0).getCsn(), is(i));
         }
-        
assertNull(CDCDataRecordUtils.findMinimumDataRecordAndSavePosition(actualIncrementalRecordMap,
 dataRecordComparator, cdcAckPositionMap));
+        
assertTrue(CDCDataRecordUtils.findMinimumDataRecordsAndSavePosition(actualIncrementalRecordMap,
 dataRecordComparator, cdcAckPositionMap).isEmpty());
     }
     
     private DataRecord generateDataRecord(final long csn) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 1514e76ba6b..2a662553ec0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -99,7 +99,7 @@ public final class DataSourceImporter extends 
AbstractLifecycleExecutor implemen
         int batchSize = importerConfig.getBatchSize();
         while (isRunning()) {
             List<Record> records = channel.fetchRecords(batchSize, 3, 
TimeUnit.SECONDS);
-            if (null != records && !records.isEmpty()) {
+            if (!records.isEmpty()) {
                 PipelineJobProgressUpdatedParameter updatedParam = 
flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), 
records);
                 channel.ack(records);
                 jobProgressListener.onProgressUpdated(updatedParam);
@@ -266,12 +266,13 @@ public final class DataSourceImporter extends 
AbstractLifecycleExecutor implemen
     
     private void executeBatchDelete(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
         DataRecord dataRecord = dataRecords.get(0);
-        List<Column> conditionColumns = 
RecordUtils.extractConditionColumns(dataRecord, 
importerConfig.getShardingColumns(dataRecord.getTableName()));
-        String deleteSQL = 
pipelineSqlBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord, conditionColumns);
+        String deleteSQL = 
pipelineSqlBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord,
+                RecordUtils.extractConditionColumns(dataRecord, 
importerConfig.getShardingColumns(dataRecord.getTableName())));
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(deleteSQL)) {
             batchDeleteStatement.set(preparedStatement);
             preparedStatement.setQueryTimeout(30);
             for (DataRecord each : dataRecords) {
+                List<Column> conditionColumns = 
RecordUtils.extractConditionColumns(each, 
importerConfig.getShardingColumns(dataRecord.getTableName()));
                 for (int i = 0; i < conditionColumns.size(); i++) {
                     Object oldValue = conditionColumns.get(i).getOldValue();
                     if (null == oldValue) {
@@ -283,7 +284,7 @@ public final class DataSourceImporter extends 
AbstractLifecycleExecutor implemen
             }
             int[] counts = preparedStatement.executeBatch();
             if (IntStream.of(counts).anyMatch(value -> 1 != value)) {
-                log.warn("batchDelete failed, counts={}, sql={}, 
conditionColumns={}", Arrays.toString(counts), deleteSQL, conditionColumns);
+                log.warn("batchDelete failed, counts={}, sql={}, 
dataRecords={}", Arrays.toString(counts), deleteSQL, dataRecords);
             }
         } finally {
             batchDeleteStatement.set(null);
@@ -295,23 +296,19 @@ public final class DataSourceImporter extends 
AbstractLifecycleExecutor implemen
             return;
         }
         try (Connection connection = dataSource.getConnection()) {
-            sequentialFlush(connection, buffer);
+            // TODO it's better use transaction, but execute delete maybe not 
effect when open transaction of PostgreSQL sometimes
+            for (DataRecord each : buffer) {
+                try {
+                    doFlush(connection, each);
+                } catch (final SQLException ex) {
+                    throw new 
PipelineImporterJobWriteException(String.format("Write failed, record=%s", 
each), ex);
+                }
+            }
         } catch (final SQLException ex) {
             throw new PipelineImporterJobWriteException(ex);
         }
     }
     
-    private void sequentialFlush(final Connection connection, final 
List<DataRecord> buffer) {
-        // TODO it's better use transaction, but execute delete maybe not 
effect when open transaction of PostgreSQL sometimes
-        for (DataRecord each : buffer) {
-            try {
-                doFlush(connection, each);
-            } catch (final SQLException ex) {
-                throw new 
PipelineImporterJobWriteException(String.format("Write failed, record=%s", 
each), ex);
-            }
-        }
-    }
-    
     @Override
     protected void doStop() throws SQLException {
         cancelStatement(batchInsertStatement.get());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
index ede119b8d74..63b16b247e0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
@@ -40,8 +40,9 @@ public final class MemoryPipelineChannelCreator implements 
PipelineChannelCreato
     }
     
     @Override
-    public PipelineChannel createPipelineChannel(final int outputConcurrency, 
final AckCallback ackCallback) {
-        return 1 == outputConcurrency ? new 
SimpleMemoryPipelineChannel(blockQueueSize, ackCallback) : new 
MultiplexMemoryPipelineChannel(outputConcurrency, blockQueueSize, ackCallback);
+    public PipelineChannel createPipelineChannel(final int outputConcurrency, 
final int averageElementSize, final AckCallback ackCallback) {
+        return 1 == outputConcurrency ? new SimpleMemoryPipelineChannel((int) 
Math.ceil((double) blockQueueSize / averageElementSize), ackCallback)
+                : new MultiplexMemoryPipelineChannel(outputConcurrency, 
blockQueueSize, ackCallback);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
index 50bc4df6ad8..2e09b725c51 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
@@ -23,7 +23,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,25 +50,37 @@ public final class MultiplexMemoryPipelineChannel 
implements PipelineChannel {
     }
     
     @Override
-    public void pushRecord(final Record record) {
-        if (FinishedRecord.class.equals(record.getClass())) {
+    public void pushRecords(final List<Record> records) {
+        Record firstRecord = records.get(0);
+        if (1 == records.size()) {
+            pushRecord(firstRecord);
+            return;
+        }
+        long insertDataRecordsCount = 
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).filter(each
 -> IngestDataChangeType.INSERT.equals(each.getType())).count();
+        if (insertDataRecordsCount == records.size()) {
+            channels.get(Math.abs(firstRecord.hashCode() % 
channelNumber)).pushRecords(records);
+            return;
+        }
+        for (Record record : records) {
+            pushRecord(record);
+        }
+    }
+    
+    private void pushRecord(final Record record) {
+        List<Record> records = Collections.singletonList(record);
+        if (record instanceof FinishedRecord) {
             for (int i = 0; i < channelNumber; i++) {
-                pushRecord(record, i);
+                channels.get(i).pushRecords(records);
             }
         } else if (DataRecord.class.equals(record.getClass())) {
-            pushRecord(record, Math.abs(record.hashCode() % channelNumber));
+            channels.get(Math.abs(record.hashCode() % 
channelNumber)).pushRecords(records);
         } else if (PlaceholderRecord.class.equals(record.getClass())) {
-            pushRecord(record, 0);
+            channels.get(0).pushRecords(records);
         } else {
             throw new UnsupportedOperationException("Unsupported record type: 
" + record.getClass().getName());
         }
     }
     
-    private void pushRecord(final Record record, final int channelIndex) {
-        PipelineChannel channel = channels.get(channelIndex);
-        channel.pushRecord(record);
-    }
-    
     @Override
     public List<Record> fetchRecords(final int batchSize, final int timeout, 
final TimeUnit timeUnit) {
         return findChannel().fetchRecords(batchSize, timeout, timeUnit);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
index 954e79ab9a9..605c3d25849 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
@@ -22,7 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 
-import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
  */
 public final class SimpleMemoryPipelineChannel implements PipelineChannel {
     
-    private final BlockingQueue<Record> queue;
+    private final BlockingQueue<List<Record>> queue;
     
     private final AckCallback ackCallback;
     
@@ -44,23 +44,29 @@ public final class SimpleMemoryPipelineChannel implements 
PipelineChannel {
     
     @SneakyThrows(InterruptedException.class)
     @Override
-    public void pushRecord(final Record dataRecord) {
-        queue.put(dataRecord);
+    public void pushRecords(final List<Record> records) {
+        queue.put(records);
     }
     
     @SneakyThrows(InterruptedException.class)
     // TODO thread-safe?
     @Override
     public List<Record> fetchRecords(final int batchSize, final int timeout, 
final TimeUnit timeUnit) {
-        List<Record> result = new ArrayList<>(batchSize);
+        List<Record> result = new LinkedList<>();
         long start = System.currentTimeMillis();
-        while (batchSize > queue.size()) {
+        int recordsCount = 0;
+        while (batchSize > recordsCount) {
+            List<Record> records = queue.poll();
+            if (null == records || records.isEmpty()) {
+                TimeUnit.MILLISECONDS.sleep(Math.min(100, 
timeUnit.toMillis(timeout)));
+            } else {
+                recordsCount += records.size();
+                result.addAll(records);
+            }
             if (timeUnit.toMillis(timeout) <= System.currentTimeMillis() - 
start) {
                 break;
             }
-            TimeUnit.MILLISECONDS.sleep(100L);
         }
-        queue.drainTo(result, batchSize);
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 8dfc0048cd4..d1562e68dc4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -33,6 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPos
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -59,6 +60,7 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
@@ -108,7 +110,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
             log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
             throw new IngestException("Inventory dump failed on " + 
dumperConfig.getActualTableName(), ex);
         } finally {
-            channel.pushRecord(new FinishedRecord(new FinishedPosition()));
+            channel.pushRecords(Collections.singletonList(new 
FinishedRecord(new FinishedPosition())));
         }
     }
     
@@ -128,9 +130,14 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
                 int rowCount = 0;
                 JobRateLimitAlgorithm rateLimitAlgorithm = 
dumperConfig.getRateLimitAlgorithm();
                 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+                List<Record> dataRecords = new LinkedList<>();
                 while (resultSet.next()) {
-                    channel.pushRecord(loadDataRecord(resultSet, 
resultSetMetaData, tableMetaData));
+                    dataRecords.add(loadDataRecord(resultSet, 
resultSetMetaData, tableMetaData));
                     ++rowCount;
+                    if (dataRecords.size() >= batchSize) {
+                        channel.pushRecords(dataRecords);
+                        dataRecords = new LinkedList<>();
+                    }
                     if (!isRunning()) {
                         log.info("Broke because of inventory dump is not 
running.");
                         break;
@@ -139,6 +146,9 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
                         rateLimitAlgorithm.intercept(JobOperationType.SELECT, 
1);
                     }
                 }
+                if (!dataRecords.isEmpty()) {
+                    channel.pushRecords(dataRecords);
+                }
                 dumpStatement.set(null);
                 log.info("Inventory dump done, rowCount={}", rowCount);
             }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
index 68336abd7c9..ac27b0c7c9b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
@@ -19,10 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.record;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -87,21 +85,4 @@ public final class RecordUtils {
         }
         return result;
     }
-    
-    /**
-     * Get last normal record.
-     *
-     * @param records records
-     * @return last normal record.
-     */
-    public static Record getLastNormalRecord(final List<Record> records) {
-        for (int index = records.size() - 1; index >= 0; index--) {
-            Record record = records.get(index);
-            if (record.getPosition() instanceof PlaceholderPosition) {
-                continue;
-            }
-            return record;
-        }
-        return null;
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index b081785bd40..75426260d90 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -105,7 +105,7 @@ public final class IncrementalTask implements PipelineTask, 
AutoCloseable {
     }
     
     private PipelineChannel createChannel(final int concurrency, final 
PipelineChannelCreator pipelineChannelCreator, final IncrementalTaskProgress 
progress) {
-        return pipelineChannelCreator.createPipelineChannel(concurrency, 
records -> {
+        return pipelineChannelCreator.createPipelineChannel(concurrency, 5, 
records -> {
             Record lastHandledRecord = records.get(records.size() - 1);
             if (!(lastHandledRecord.getPosition() instanceof 
PlaceholderPosition)) {
                 progress.setPosition(lastHandledRecord.getPosition());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 7aa477675a6..852230ed3c0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -34,7 +34,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskPr
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
-import org.apache.shardingsphere.data.pipeline.core.record.RecordUtils;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
@@ -76,7 +75,7 @@ public final class InventoryTask implements PipelineTask, 
AutoCloseable {
         taskId = generateTaskId(inventoryDumperConfig);
         this.inventoryDumperExecuteEngine = inventoryDumperExecuteEngine;
         this.inventoryImporterExecuteEngine = inventoryImporterExecuteEngine;
-        channel = createChannel(pipelineChannelCreator);
+        channel = createChannel(pipelineChannelCreator, 
importerConfig.getBatchSize());
         dumper = new InventoryDumper(inventoryDumperConfig, channel, 
sourceDataSource, sourceMetaDataLoader);
         importer = TypedSPILoader.getService(ImporterCreator.class,
                 importerConnector.getType()).createImporter(importerConfig, 
importerConnector, channel, jobProgressListener, ImporterType.INVENTORY);
@@ -120,12 +119,10 @@ public final class InventoryTask implements PipelineTask, 
AutoCloseable {
         return result;
     }
     
-    private PipelineChannel createChannel(final PipelineChannelCreator 
pipelineChannelCreator) {
-        return pipelineChannelCreator.createPipelineChannel(1, records -> {
-            Record lastNormalRecord = RecordUtils.getLastNormalRecord(records);
-            if (null != lastNormalRecord) {
-                position.set(lastNormalRecord.getPosition());
-            }
+    private PipelineChannel createChannel(final PipelineChannelCreator 
pipelineChannelCreator, final int batchSize) {
+        return pipelineChannelCreator.createPipelineChannel(1, batchSize, 
records -> {
+            Record lastRecord = records.get(records.size() - 1);
+            position.set(lastRecord.getPosition());
         });
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
index d9ac1de2854..4a1b6d2dc0e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
@@ -30,8 +30,9 @@ public interface PipelineChannelCreator extends TypedSPI {
      * Create pipeline channel.
      *
      * @param outputConcurrency output concurrency
+     * @param averageElementSize average element size, affect the size of the 
queue
      * @param ackCallback ack callback
      * @return {@link PipelineChannel}
      */
-    PipelineChannel createPipelineChannel(int outputConcurrency, AckCallback 
ackCallback);
+    PipelineChannel createPipelineChannel(int outputConcurrency, int 
averageElementSize, AckCallback ackCallback);
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreatorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreatorTest.java
index d3fb83b6283..4e1c839c3fd 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreatorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreatorTest.java
@@ -46,11 +46,11 @@ class MemoryPipelineChannelCreatorTest {
     
     @Test
     void assertCreateSimpleMemoryPipelineChannel() {
-        assertThat(TypedSPILoader.getService(PipelineChannelCreator.class, 
"MEMORY").createPipelineChannel(1, mock(AckCallback.class)), 
instanceOf(SimpleMemoryPipelineChannel.class));
+        assertThat(TypedSPILoader.getService(PipelineChannelCreator.class, 
"MEMORY").createPipelineChannel(1, 1, mock(AckCallback.class)), 
instanceOf(SimpleMemoryPipelineChannel.class));
     }
     
     @Test
     void assertCreateMultiplexMemoryPipelineChannel() {
-        assertThat(TypedSPILoader.getService(PipelineChannelCreator.class, 
"MEMORY").createPipelineChannel(2, mock(AckCallback.class)), 
instanceOf(MultiplexMemoryPipelineChannel.class));
+        assertThat(TypedSPILoader.getService(PipelineChannelCreator.class, 
"MEMORY").createPipelineChannel(2, 1, mock(AckCallback.class)), 
instanceOf(MultiplexMemoryPipelineChannel.class));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
index 75099e77fb7..2d0b1d258cf 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
@@ -70,9 +70,7 @@ class MultiplexMemoryPipelineChannelTest {
         CountDownLatch countDownLatch = new CountDownLatch(recordCount);
         MultiplexMemoryPipelineChannel memoryChannel = new 
MultiplexMemoryPipelineChannel(CHANNEL_NUMBER, 10000, ackCallback);
         fetchWithMultiThreads(memoryChannel, countDownLatch);
-        for (Record record : records) {
-            memoryChannel.pushRecord(record);
-        }
+        memoryChannel.pushRecords(Arrays.asList(records));
         boolean awaitResult = countDownLatch.await(10, TimeUnit.SECONDS);
         assertTrue(awaitResult, "await failed");
         memoryChannel.close();
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
new file mode 100644
index 00000000000..0666bc8c258
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory;
+
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class SimpleMemoryPipelineChannelTest {
+    
+    @Test
+    void assertFetchRecordsTimeoutCorrectly() {
+        SimpleMemoryPipelineChannel simpleMemoryPipelineChannel = new 
SimpleMemoryPipelineChannel(10, new EmptyAckCallback());
+        long startMills = System.currentTimeMillis();
+        simpleMemoryPipelineChannel.fetchRecords(1, 1, TimeUnit.MILLISECONDS);
+        long endMills = System.currentTimeMillis();
+        assertTrue(endMills - startMills >= 1 && endMills - startMills < 50);
+        startMills = System.currentTimeMillis();
+        simpleMemoryPipelineChannel.fetchRecords(1, 500, 
TimeUnit.MILLISECONDS);
+        endMills = System.currentTimeMillis();
+        assertTrue(endMills - startMills >= 500 && endMills - startMills < 
600);
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
index 6206d792aec..0f5c00a80e9 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
@@ -17,23 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.record;
 
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 
 import static org.hamcrest.CoreMatchers.hasItems;
-import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -73,14 +66,4 @@ class RecordUtilsTest {
         result.addColumn(new Column("c3", "", true, false));
         return result;
     }
-    
-    @Test
-    void assertGetLastNormalRecord() {
-        List<Record> actual = Arrays.asList(new DataRecord(new 
IntegerPrimaryKeyPosition(0, 1), 0), new PlaceholderRecord(new 
PlaceholderPosition()));
-        Record expected = RecordUtils.getLastNormalRecord(actual);
-        assertThat(expected, instanceOf(DataRecord.class));
-        actual = Arrays.asList(new DataRecord(new IntegerPrimaryKeyPosition(0, 
1), 0), new PlaceholderRecord(new PlaceholderPosition()), new 
FinishedRecord(new FinishedPosition()));
-        expected = RecordUtils.getLastNormalRecord(actual);
-        assertThat(expected, instanceOf(FinishedRecord.class));
-    }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 772060603ca..71f7c8f0f55 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -25,12 +25,13 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJd
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -42,7 +43,6 @@ import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPositio
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ConnectInfo;
@@ -55,6 +55,9 @@ import 
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Random;
@@ -88,7 +91,8 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
         YamlJdbcConfiguration jdbcConfig = 
((StandardPipelineDataSourceConfiguration) 
dumperConfig.getDataSourceConfig()).getJdbcConfig();
         log.info("incremental dump, jdbcUrl={}", jdbcConfig.getUrl());
         DataSourceMetaData metaData = 
TypedSPILoader.getService(DatabaseType.class, 
"MySQL").getDataSourceMetaData(jdbcConfig.getUrl(), null);
-        client = new MySQLClient(new ConnectInfo(new Random().nextInt(), 
metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), 
jdbcConfig.getPassword()));
+        ConnectInfo connectInfo = new ConnectInfo(new Random().nextInt(), 
metaData.getHostname(), metaData.getPort(), jdbcConfig.getUsername(), 
jdbcConfig.getPassword());
+        client = new MySQLClient(connectInfo, dumperConfig.isDecodeWithTX());
         catalog = metaData.getCatalog();
     }
     
@@ -97,46 +101,64 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
         client.connect();
         client.subscribe(binlogPosition.getFilename(), 
binlogPosition.getPosition());
         while (isRunning()) {
-            AbstractBinlogEvent event = client.poll();
-            if (null == event) {
+            List<AbstractBinlogEvent> events = client.poll();
+            if (events.isEmpty()) {
                 continue;
             }
-            handleEvent(event);
+            handleEvents(events);
         }
-        channel.pushRecord(new FinishedRecord(new PlaceholderPosition()));
+        channel.pushRecords(Collections.singletonList(new FinishedRecord(new 
FinishedPosition())));
     }
     
-    private void handleEvent(final AbstractBinlogEvent event) {
-        if (event instanceof PlaceholderEvent || !((AbstractRowsEvent) 
event).getDatabaseName().equals(catalog) || 
!dumperConfig.containsTable(((AbstractRowsEvent) event).getTableName())) {
-            createPlaceholderRecord(event);
+    private void handleEvents(final List<AbstractBinlogEvent> events) {
+        List<Record> dataRecords = new LinkedList<>();
+        for (AbstractBinlogEvent each : events) {
+            if (!(each instanceof AbstractRowsEvent)) {
+                dataRecords.add(createPlaceholderRecord(each));
+                continue;
+            }
+            dataRecords.addAll(handleEvent(each));
+        }
+        if (dataRecords.isEmpty()) {
             return;
         }
-        PipelineTableMetaData tableMetaData = 
getPipelineTableMetaData(((AbstractRowsEvent) event).getTableName());
+        channel.pushRecords(dataRecords);
+    }
+    
+    private List<? extends Record> handleEvent(final AbstractBinlogEvent 
event) {
+        if (!(event instanceof AbstractRowsEvent)) {
+            return Collections.singletonList(createPlaceholderRecord(event));
+        }
+        AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event;
+        if (!rowsEvent.getDatabaseName().equals(catalog) || 
!dumperConfig.containsTable(rowsEvent.getTableName())) {
+            return Collections.singletonList(createPlaceholderRecord(event));
+        }
+        PipelineTableMetaData tableMetaData = 
getPipelineTableMetaData(rowsEvent.getTableName());
         if (event instanceof WriteRowsEvent) {
-            handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
-            return;
+            return handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
         }
         if (event instanceof UpdateRowsEvent) {
-            handleUpdateRowsEvent((UpdateRowsEvent) event, tableMetaData);
-            return;
+            return handleUpdateRowsEvent((UpdateRowsEvent) event, 
tableMetaData);
         }
         if (event instanceof DeleteRowsEvent) {
-            handleDeleteRowsEvent((DeleteRowsEvent) event, tableMetaData);
+            return handleDeleteRowsEvent((DeleteRowsEvent) event, 
tableMetaData);
         }
+        return Collections.emptyList();
     }
     
-    private void createPlaceholderRecord(final AbstractBinlogEvent event) {
-        PlaceholderRecord placeholderRecord = new PlaceholderRecord(new 
BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
-        placeholderRecord.setCommitTime(event.getTimestamp() * 1000L);
-        channel.pushRecord(placeholderRecord);
+    private PlaceholderRecord createPlaceholderRecord(final 
AbstractBinlogEvent event) {
+        PlaceholderRecord result = new PlaceholderRecord(new 
BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
+        result.setCommitTime(event.getTimestamp() * 1000L);
+        return result;
     }
     
     private PipelineTableMetaData getPipelineTableMetaData(final String 
actualTableName) {
         return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new 
ActualTableName(actualTableName)), actualTableName);
     }
     
-    private void handleWriteRowsEvent(final WriteRowsEvent event, final 
PipelineTableMetaData tableMetaData) {
+    private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event, 
final PipelineTableMetaData tableMetaData) {
         Set<ColumnName> columnNameSet = 
dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
+        List<DataRecord> result = new LinkedList<>();
         for (Serializable[] each : event.getAfterRows()) {
             DataRecord dataRecord = createDataRecord(event, each.length);
             dataRecord.setType(IngestDataChangeType.INSERT);
@@ -147,16 +169,18 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
                 }
                 dataRecord.addColumn(new Column(columnMetaData.getName(), 
handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
             }
-            channel.pushRecord(dataRecord);
+            result.add(dataRecord);
         }
+        return result;
     }
     
     private boolean isColumnUnneeded(final Set<ColumnName> columnNameSet, 
final String columnName) {
         return null != columnNameSet && !columnNameSet.contains(new 
ColumnName(columnName));
     }
     
-    private void handleUpdateRowsEvent(final UpdateRowsEvent event, final 
PipelineTableMetaData tableMetaData) {
+    private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent 
event, final PipelineTableMetaData tableMetaData) {
         Set<ColumnName> columnNameSet = 
dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
+        List<DataRecord> result = new LinkedList<>();
         for (int i = 0; i < event.getBeforeRows().size(); i++) {
             Serializable[] beforeValues = event.getBeforeRows().get(i);
             Serializable[] afterValues = event.getAfterRows().get(i);
@@ -174,12 +198,14 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
                         handleValue(columnMetaData, oldValue),
                         handleValue(columnMetaData, newValue), updated, 
columnMetaData.isUniqueKey()));
             }
-            channel.pushRecord(dataRecord);
+            result.add(dataRecord);
         }
+        return result;
     }
     
-    private void handleDeleteRowsEvent(final DeleteRowsEvent event, final 
PipelineTableMetaData tableMetaData) {
+    private List<DataRecord> handleDeleteRowsEvent(final DeleteRowsEvent 
event, final PipelineTableMetaData tableMetaData) {
         Set<ColumnName> columnNameSet = 
dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
+        List<DataRecord> result = new LinkedList<>();
         for (Serializable[] each : event.getBeforeRows()) {
             DataRecord dataRecord = createDataRecord(event, each.length);
             dataRecord.setType(IngestDataChangeType.DELETE);
@@ -190,8 +216,9 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
                 }
                 dataRecord.addColumn(new Column(columnMetaData.getName(), 
handleValue(columnMetaData, each[i]), null, true, 
columnMetaData.isUniqueKey()));
             }
-            channel.pushRecord(dataRecord);
+            result.add(dataRecord);
         }
+        return result;
     }
     
     private Serializable handleValue(final PipelineColumnMetaData 
columnMetaData, final Serializable value) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/QueryEvent.java
similarity index 50%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
copy to 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/QueryEvent.java
index d9ac1de2854..e006c61d7d6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/QueryEvent.java
@@ -15,23 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.ingest.channel;
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event;
 
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 
 /**
- * Pipeline channel creator.
+ * Query event.This event is written into the binary log file for:
+ * 1. STATEMENT based replication (updating statements)
+ * 2. DDLs
+ * 3. COMMIT related to non transactional engines (MyISAM, BLACKHOLE etc).
+ *
+ * @see <a 
href="https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_event_query";>QUERY_EVENT</a>
  */
-public interface PipelineChannelCreator extends TypedSPI {
+@Getter
+@RequiredArgsConstructor
+public final class QueryEvent extends AbstractBinlogEvent {
+    
+    private final long threadId;
+    
+    private final long executionTime;
+    
+    private final int errorCode;
+    
+    private final String databaseName;
     
-    /**
-     * Create pipeline channel.
-     *
-     * @param outputConcurrency output concurrency
-     * @param ackCallback ack callback
-     * @return {@link PipelineChannel}
-     */
-    PipelineChannel createPipelineChannel(int outputConcurrency, AckCallback 
ackCallback);
+    private final String sql;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/XidEvent.java
similarity index 55%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
copy to 
kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/XidEvent.java
index d9ac1de2854..0e1bc1be469 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelCreator.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/event/XidEvent.java
@@ -15,23 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.ingest.channel;
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event;
 
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 
 /**
- * Pipeline channel creator.
+ * XID event is generated for a COMMIT of a transaction that modifies one or 
more tables of an XA-capable storage engine.
+ *
+ * @see <a 
href="https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_event_xid";>XID_EVENT</a>
  */
-public interface PipelineChannelCreator extends TypedSPI {
+@RequiredArgsConstructor
+@Getter
+public final class XidEvent extends AbstractBinlogEvent {
     
-    /**
-     * Create pipeline channel.
-     *
-     * @param outputConcurrency output concurrency
-     * @param ackCallback ack callback
-     * @return {@link PipelineChannel}
-     */
-    PipelineChannel createPipelineChannel(int outputConcurrency, AckCallback 
ackCallback);
+    private final long xid;
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index bec2dbd84c0..d2c9dbaf502 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -54,6 +54,8 @@ import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 
 import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
@@ -77,7 +79,7 @@ public final class MySQLClient {
     
     private Promise<Object> responseCallback;
     
-    private final ArrayBlockingQueue<AbstractBinlogEvent> blockingEventQueue = 
new ArrayBlockingQueue<>(10000);
+    private final ArrayBlockingQueue<List<AbstractBinlogEvent>> 
blockingEventQueue = new ArrayBlockingQueue<>(2500);
     
     private ServerInfo serverInfo;
     
@@ -85,6 +87,8 @@ public final class MySQLClient {
     
     private final AtomicInteger reconnectTimes = new AtomicInteger();
     
+    private final boolean decodeWithTX;
+    
     /**
      * Connect to MySQL.
      */
@@ -206,7 +210,7 @@ public final class MySQLClient {
         channel.pipeline().remove(MySQLCommandPacketDecoder.class);
         channel.pipeline().remove(MySQLCommandResponseHandler.class);
         String tableKey = String.join(":", connectInfo.getHost(), 
String.valueOf(connectInfo.getPort()));
-        channel.pipeline().addLast(new 
MySQLBinlogEventPacketDecoder(checksumLength, 
GlobalTableMapEventMapping.getTableMapEventMap(tableKey)));
+        channel.pipeline().addLast(new 
MySQLBinlogEventPacketDecoder(checksumLength, 
GlobalTableMapEventMapping.getTableMapEventMap(tableKey), decodeWithTX));
         channel.pipeline().addLast(new 
MySQLBinlogEventHandler(getLastBinlogEvent(binlogFileName, binlogPosition)));
         resetSequenceID();
         channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) 
binlogPosition, connectInfo.getServerId(), binlogFileName));
@@ -228,13 +232,14 @@ public final class MySQLClient {
      *
      * @return binlog event
      */
-    public synchronized AbstractBinlogEvent poll() {
+    public synchronized List<AbstractBinlogEvent> poll() {
         ShardingSpherePreconditions.checkState(running, 
BinlogSyncChannelAlreadyClosedException::new);
         try {
-            return blockingEventQueue.poll(100L, TimeUnit.MILLISECONDS);
+            List<AbstractBinlogEvent> result = blockingEventQueue.poll(100L, 
TimeUnit.MILLISECONDS);
+            return null == result ? Collections.emptyList() : result;
         } catch (final InterruptedException ignored) {
             Thread.currentThread().interrupt();
-            return null;
+            return Collections.emptyList();
         }
     }
     
@@ -305,15 +310,26 @@ public final class MySQLClient {
             this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
         }
         
+        @SuppressWarnings("unchecked")
         @Override
         public void channelRead(final ChannelHandlerContext ctx, final Object 
msg) throws Exception {
             if (!running) {
                 return;
             }
+            reconnectTimes.set(0);
+            if (msg instanceof List) {
+                List<AbstractBinlogEvent> records = 
(List<AbstractBinlogEvent>) msg;
+                if (records.isEmpty()) {
+                    log.warn("The records is empty");
+                    return;
+                }
+                lastBinlogEvent.set(records.get(records.size() - 1));
+                blockingEventQueue.put(records);
+                return;
+            }
             if (msg instanceof AbstractBinlogEvent) {
                 lastBinlogEvent.set((AbstractBinlogEvent) msg);
-                blockingEventQueue.put(lastBinlogEvent.get());
-                reconnectTimes.set(0);
+                
blockingEventQueue.put(Collections.singletonList(lastBinlogEvent.get()));
             }
         }
         
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 98079fc3c97..14088dc3a1a 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -28,8 +28,10 @@ import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.Abstrac
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.QueryEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.XidEvent;
 import org.apache.shardingsphere.db.protocol.constant.CommonConstants;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
@@ -39,6 +41,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlog
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -49,22 +52,41 @@ import java.util.Optional;
 @Slf4j
 public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder {
     
+    private static final String TX_BEGIN_SQL = "BEGIN";
+    
     private final BinlogContext binlogContext;
     
-    public MySQLBinlogEventPacketDecoder(final int checksumLength, final 
Map<Long, MySQLBinlogTableMapEventPacket> tableMap) {
+    private final boolean decodeWithTX;
+    
+    private List<AbstractBinlogEvent> records = new LinkedList<>();
+    
+    public MySQLBinlogEventPacketDecoder(final int checksumLength, final 
Map<Long, MySQLBinlogTableMapEventPacket> tableMap, final boolean decodeWithTX) 
{
+        this.decodeWithTX = decodeWithTX;
         binlogContext = new BinlogContext(checksumLength, tableMap);
     }
     
     @Override
     protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, 
final List<Object> out) {
-        // readable bytes must greater + statusCode(1b) + header-length(19b) +
         while (in.readableBytes() >= 1 + 
MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
             in.markReaderIndex();
             MySQLPacketPayload payload = new MySQLPacketPayload(in, 
ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
             checkPayload(payload);
             MySQLBinlogEventHeader binlogEventHeader = new 
MySQLBinlogEventHeader(payload, binlogContext.getChecksumLength());
             if (checkEventIntegrity(in, binlogEventHeader)) {
-                decodeEvent(binlogEventHeader, payload).ifPresent(out::add);
+                Optional<AbstractBinlogEvent> binlogEvent = 
decodeEvent(binlogEventHeader, payload);
+                if (!binlogEvent.isPresent()) {
+                    skipChecksum(binlogEventHeader.getEventType(), in);
+                    return;
+                }
+                if (binlogEvent.get() instanceof PlaceholderEvent) {
+                    out.add(binlogEvent);
+                } else {
+                    if (decodeWithTX) {
+                        processEventWithTX(binlogEvent.get(), out);
+                    } else {
+                        processEventIgnoreTX(binlogEvent.get(), out);
+                    }
+                }
                 skipChecksum(binlogEventHeader.getEventType(), in);
             } else {
                 break;
@@ -98,6 +120,30 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         return true;
     }
     
+    private void processEventWithTX(final AbstractBinlogEvent binlogEvent, 
final List<Object> out) {
+        if (binlogEvent instanceof QueryEvent) {
+            QueryEvent queryEvent = (QueryEvent) binlogEvent;
+            if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
+                records = new LinkedList<>();
+            }
+        } else if (binlogEvent instanceof XidEvent) {
+            records.add(binlogEvent);
+            out.add(records);
+        } else {
+            records.add(binlogEvent);
+        }
+    }
+    
+    private void processEventIgnoreTX(final AbstractBinlogEvent binlogEvent, 
final List<Object> out) {
+        if (binlogEvent instanceof QueryEvent) {
+            QueryEvent queryEvent = (QueryEvent) binlogEvent;
+            if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
+                return;
+            }
+        }
+        out.add(binlogEvent);
+    }
+    
     private Optional<AbstractBinlogEvent> decodeEvent(final 
MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
         switch 
(MySQLBinlogEventType.valueOf(binlogEventHeader.getEventType()).orElse(MySQLBinlogEventType.UNKNOWN_EVENT))
 {
             case ROTATE_EVENT:
@@ -118,6 +164,10 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
             case DELETE_ROWS_EVENT_V1:
             case DELETE_ROWS_EVENT_V2:
                 return Optional.of(decodeDeleteRowsEventV2(binlogEventHeader, 
payload));
+            case QUERY_EVENT:
+                return 
Optional.of(decodeQueryEvent(binlogEventHeader.getChecksumLength(), payload));
+            case XID_EVENT:
+                return Optional.of(decodeXidEvent(binlogEventHeader, payload));
             default:
                 return Optional.of(decodePlaceholderEvent(binlogEventHeader, 
payload));
         }
@@ -191,6 +241,27 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
         return result;
     }
     
+    private QueryEvent decodeQueryEvent(final int checksumLength, final 
MySQLPacketPayload payload) {
+        int threadId = payload.readInt4();
+        int executionTime = payload.readInt4();
+        payload.skipReserved(1);
+        int errorCode = payload.readInt2();
+        payload.skipReserved(payload.readInt2());
+        String databaseName = payload.readStringNul();
+        String sql = 
payload.readStringFix(payload.getByteBuf().readableBytes() - checksumLength);
+        return new QueryEvent(threadId, executionTime, errorCode, 
databaseName, sql);
+    }
+    
+    private XidEvent decodeXidEvent(final MySQLBinlogEventHeader 
binlogEventHeader, final MySQLPacketPayload payload) {
+        XidEvent result = new XidEvent(payload.readInt8());
+        result.setFileName(binlogContext.getFileName());
+        result.setPosition(binlogEventHeader.getLogPos());
+        result.setTimestamp(binlogEventHeader.getTimestamp());
+        result.setServerId(binlogEventHeader.getServerId());
+        return result;
+    }
+    
+    // TODO May be used again later, keep this method first.
     private PlaceholderEvent createPlaceholderEvent(final 
MySQLBinlogEventHeader binlogEventHeader) {
         PlaceholderEvent result = new PlaceholderEvent();
         result.setFileName(binlogContext.getFileName());
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 52f13528191..b386245eb57 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -23,7 +23,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
@@ -34,8 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableM
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
@@ -63,16 +61,19 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
+@SuppressWarnings("unchecked")
 class MySQLIncrementalDumperTest {
     
     private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
@@ -81,19 +82,18 @@ class MySQLIncrementalDumperTest {
     
     private MySQLIncrementalDumper incrementalDumper;
     
-    private MultiplexMemoryPipelineChannel channel;
-    
     private PipelineTableMetaData pipelineTableMetaData;
     
     @BeforeEach
     void setUp() {
         dumperConfig = mockDumperConfiguration();
         initTableData(dumperConfig);
-        dumperConfig.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0", 
"root", "root"));
-        channel = new MultiplexMemoryPipelineChannel(1, 10000, new 
EmptyAckCallback());
-        PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+        dumperConfig.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("mock:mysql://127.0.0.1:3306/test", 
"root", "root"));
+        PipelineTableMetaDataLoader metaDataLoader = 
mock(PipelineTableMetaDataLoader.class);
+        SimpleMemoryPipelineChannel channel = new 
SimpleMemoryPipelineChannel(10000, new EmptyAckCallback());
         incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new 
BinlogPosition("binlog-000001", 4L, 0L), channel, metaDataLoader);
         pipelineTableMetaData = new PipelineTableMetaData("t_order", 
mockOrderColumnsMetaDataMap(), Collections.emptyList());
+        when(metaDataLoader.getTableMetaData(any(), 
any())).thenReturn(pipelineTableMetaData);
     }
     
     private DumperConfiguration mockDumperConfiguration() {
@@ -101,6 +101,7 @@ class MySQLIncrementalDumperTest {
         result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
 "root", "root"));
         result.setTableNameMap(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order")));
         result.setTableNameSchemaNameMapping(new 
TableNameSchemaNameMapping(Collections.emptyMap()));
+        result.setTargetTableColumnsMap(Collections.singletonMap(new 
LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id"))));
         return result;
     }
     
@@ -150,8 +151,7 @@ class MySQLIncrementalDumperTest {
         rowsEvent.setTableName("t_order");
         rowsEvent.setAfterRows(Collections.singletonList(new 
Serializable[]{101, 1, "OK"}));
         Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", 
WriteRowsEvent.class, PipelineTableMetaData.class);
-        Plugins.getMemberAccessor().invoke(method, incrementalDumper, 
rowsEvent, pipelineTableMetaData);
-        List<Record> actual = channel.fetchRecords(1, 0, TimeUnit.SECONDS);
+        List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, 
pipelineTableMetaData);
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
         assertThat(((DataRecord) actual.get(0)).getType(), 
is(IngestDataChangeType.INSERT));
@@ -175,13 +175,12 @@ class MySQLIncrementalDumperTest {
     private void assertUpdateRowsEvent0(final Map<LogicTableName, 
Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws 
ReflectiveOperationException {
         dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
         UpdateRowsEvent rowsEvent = new UpdateRowsEvent();
-        rowsEvent.setDatabaseName("");
+        rowsEvent.setDatabaseName("test");
         rowsEvent.setTableName("t_order");
         rowsEvent.setBeforeRows(Collections.singletonList(new 
Serializable[]{101, 1, "OK"}));
         rowsEvent.setAfterRows(Collections.singletonList(new 
Serializable[]{101, 1, "OK2"}));
         Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", 
UpdateRowsEvent.class, PipelineTableMetaData.class);
-        Plugins.getMemberAccessor().invoke(method, incrementalDumper, 
rowsEvent, pipelineTableMetaData);
-        List<Record> actual = channel.fetchRecords(1, 0, TimeUnit.SECONDS);
+        List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, 
pipelineTableMetaData);
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
         assertThat(((DataRecord) actual.get(0)).getType(), 
is(IngestDataChangeType.UPDATE));
@@ -205,8 +204,7 @@ class MySQLIncrementalDumperTest {
         rowsEvent.setTableName("t_order");
         rowsEvent.setBeforeRows(Collections.singletonList(new 
Serializable[]{101, 1, "OK"}));
         Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", 
DeleteRowsEvent.class, PipelineTableMetaData.class);
-        Plugins.getMemberAccessor().invoke(method, incrementalDumper, 
rowsEvent, pipelineTableMetaData);
-        List<Record> actual = channel.fetchRecords(1, 0, TimeUnit.SECONDS);
+        List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, 
pipelineTableMetaData);
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0), instanceOf(DataRecord.class));
         assertThat(((DataRecord) actual.get(0)).getType(), 
is(IngestDataChangeType.DELETE));
@@ -215,19 +213,20 @@ class MySQLIncrementalDumperTest {
     
     @Test
     void assertPlaceholderEvent() throws ReflectiveOperationException {
-        
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
 AbstractBinlogEvent.class), incrementalDumper, new PlaceholderEvent());
-        List<Record> actual = channel.fetchRecords(1, 0, TimeUnit.SECONDS);
+        List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
 AbstractBinlogEvent.class),
+                incrementalDumper, new PlaceholderEvent());
         assertThat(actual.size(), is(1));
-        assertThat(actual.get(0), instanceOf(PlaceholderRecord.class));
     }
     
     @Test
     void assertRowsEventFiltered() throws ReflectiveOperationException {
         WriteRowsEvent rowsEvent = new WriteRowsEvent();
-        rowsEvent.setDatabaseName("unknown_database");
-        
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
 AbstractBinlogEvent.class), incrementalDumper, rowsEvent);
-        List<Record> actual = channel.fetchRecords(1, 0, TimeUnit.SECONDS);
+        rowsEvent.setDatabaseName("test");
+        rowsEvent.setTableName("t_order");
+        rowsEvent.setAfterRows(Collections.singletonList(new 
Serializable[]{1}));
+        List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
 AbstractBinlogEvent.class),
+                incrementalDumper, rowsEvent);
         assertThat(actual.size(), is(1));
-        assertThat(actual.get(0), instanceOf(PlaceholderRecord.class));
+        assertThat(actual.get(0), instanceOf(DataRecord.class));
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
index dcc3972be7d..7d9ffb84459 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
@@ -69,7 +69,7 @@ class MySQLClientTest {
     @SuppressWarnings("unchecked")
     @BeforeEach
     void setUp() throws InterruptedException {
-        mysqlClient = new MySQLClient(new ConnectInfo(1, "host", 3306, 
"username", "password"));
+        mysqlClient = new MySQLClient(new ConnectInfo(1, "host", 3306, 
"username", "password"), false);
         when(channel.pipeline()).thenReturn(pipeline);
         when(channel.isOpen()).thenReturn(true);
         when(channel.close()).thenReturn(channelFuture);
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
index 73bb4d7a18f..e0fb747d164 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogContext
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.XidEvent;
 import org.apache.shardingsphere.db.protocol.constant.CommonConstants;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
@@ -70,7 +71,7 @@ class MySQLBinlogEventPacketDecoderTest {
     
     @BeforeEach
     void setUp() throws NoSuchFieldException, IllegalAccessException {
-        binlogEventPacketDecoder = new MySQLBinlogEventPacketDecoder(4, new 
ConcurrentHashMap<>());
+        binlogEventPacketDecoder = new MySQLBinlogEventPacketDecoder(4, new 
ConcurrentHashMap<>(), true);
         binlogContext = (BinlogContext) 
Plugins.getMemberAccessor().get(MySQLBinlogEventPacketDecoder.class.getDeclaredField("binlogContext"),
 binlogEventPacketDecoder);
         
when(channelHandlerContext.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get()).thenReturn(StandardCharsets.UTF_8);
         columnDefs = Lists.newArrayList(new 
MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_LONGLONG), new 
MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_LONG),
@@ -124,13 +125,15 @@ class MySQLBinlogEventPacketDecoderTest {
         ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
         // the hex data is from INSERT INTO t_order(order_id, user_id, status, 
t_numeric) VALUES (1, 1, 'SUCCESS',null);
         
byteBuf.writeBytes(StringUtil.decodeHexDump("007a36a9621e0100000038000000bb7c000000007b00000000000100020004ff08010000000000000001000000075355434345535365eff9ff"));
+        
byteBuf.writeBytes(StringUtil.decodeHexDump("006acb656410010000001f000000fa29000000001643000000000000b13f8340"));
         binlogContext.getTableMap().put(123L, tableMapEventPacket);
         when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
         List<Object> decodedEvents = new LinkedList<>();
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, 
decodedEvents);
         assertThat(decodedEvents.size(), is(1));
-        assertThat(decodedEvents.get(0), instanceOf(WriteRowsEvent.class));
-        WriteRowsEvent actual = (WriteRowsEvent) decodedEvents.get(0);
+        LinkedList<?> actualEventList = (LinkedList<?>) decodedEvents.get(0);
+        assertThat(actualEventList.get(0), instanceOf(WriteRowsEvent.class));
+        WriteRowsEvent actual = (WriteRowsEvent) actualEventList.get(0);
         assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, 
new MySQLBinaryString("SUCCESS".getBytes()), null}));
     }
     
@@ -140,13 +143,15 @@ class MySQLBinlogEventPacketDecoderTest {
         // the hex data is from update t_order set status = 'updated' where 
order_id = 1;
         
byteBuf.writeBytes(StringUtil.decodeHexDump("00cb38a9621f010000004e0000000c7e000000007b00000000000100020004ffff08010000000000000001000000075355434345535308010000000000000001000000077570"
                 + "6461746564e78cee6c"));
+        
byteBuf.writeBytes(StringUtil.decodeHexDump("006acb656410010000001f000000fa29000000001643000000000000b13f8340"));
         binlogContext.getTableMap().put(123L, tableMapEventPacket);
         when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
         List<Object> decodedEvents = new LinkedList<>();
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, 
decodedEvents);
         assertThat(decodedEvents.size(), is(1));
-        assertThat(decodedEvents.get(0), instanceOf(UpdateRowsEvent.class));
-        UpdateRowsEvent actual = (UpdateRowsEvent) decodedEvents.get(0);
+        LinkedList<?> actualEventList = (LinkedList<?>) decodedEvents.get(0);
+        assertThat(actualEventList.get(0), instanceOf(UpdateRowsEvent.class));
+        UpdateRowsEvent actual = (UpdateRowsEvent) actualEventList.get(0);
         assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, 
new MySQLBinaryString("SUCCESS".getBytes()), null}));
         assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, 
new MySQLBinaryString("updated".getBytes()), null}));
     }
@@ -156,13 +161,16 @@ class MySQLBinlogEventPacketDecoderTest {
         ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
         // delete from t_order where order_id = 1;
         
byteBuf.writeBytes(StringUtil.decodeHexDump("002a80a862200100000038000000c569000000007400000000000100020004ff0801000000000000000100000007535543434553531c9580c5"));
+        
byteBuf.writeBytes(StringUtil.decodeHexDump("006acb656410010000001f000000fa29000000001643000000000000b13f8340"));
         binlogContext.getTableMap().put(116L, tableMapEventPacket);
         when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
         List<Object> decodedEvents = new LinkedList<>();
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, 
decodedEvents);
         assertThat(decodedEvents.size(), is(1));
-        assertThat(decodedEvents.get(0), instanceOf(DeleteRowsEvent.class));
-        DeleteRowsEvent actual = (DeleteRowsEvent) decodedEvents.get(0);
+        LinkedList<?> actualEventList = (LinkedList<?>) decodedEvents.get(0);
+        assertThat(actualEventList.get(0), instanceOf(DeleteRowsEvent.class));
+        assertThat(actualEventList.get(1), instanceOf(XidEvent.class));
+        DeleteRowsEvent actual = (DeleteRowsEvent) actualEventList.get(0);
         assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, 
new MySQLBinaryString("SUCCESS".getBytes()), null}));
     }
     
@@ -171,6 +179,7 @@ class MySQLBinlogEventPacketDecoderTest {
         ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
         byte[] completeData = 
StringUtil.decodeHexDump("002a80a862200100000038000000c569000000007400000000000100020004ff0801000000000000000100000007535543434553531c9580c5");
         byteBuf.writeBytes(completeData);
+        
byteBuf.writeBytes(StringUtil.decodeHexDump("006acb656410010000001f000000fa29000000001643000000000000b13f8340"));
         // write incomplete event data
         byteBuf.writeBytes(StringUtil.decodeHexDump("3400"));
         List<Object> decodedEvents = new LinkedList<>();
@@ -178,7 +187,6 @@ class MySQLBinlogEventPacketDecoderTest {
         when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, 
decodedEvents);
         assertThat(decodedEvents.size(), is(1));
-        assertThat(byteBuf.readerIndex(), is(completeData.length));
     }
     
     @Test
@@ -186,6 +194,7 @@ class MySQLBinlogEventPacketDecoderTest {
         ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
         byte[] completeData = 
StringUtil.decodeHexDump("002a80a862200100000038000000c569000000007400000000000100020004ff0801000000000000000100000007535543434553531c9580c5");
         byteBuf.writeBytes(completeData);
+        
byteBuf.writeBytes(StringUtil.decodeHexDump("006acb656410010000001f000000fa29000000001643000000000000b13f8340"));
         byte[] notCompleteData = 
StringUtil.decodeHexDump("00cb38a962130100000041000000be7d000000007b000000000001000464735f310009745f6f726465725f31000408030f");
         byteBuf.writeBytes(notCompleteData);
         List<Object> decodedEvents = new LinkedList<>();
@@ -193,6 +202,5 @@ class MySQLBinlogEventPacketDecoderTest {
         when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, 
decodedEvents);
         assertThat(decodedEvents.size(), is(1));
-        assertThat(byteBuf.readerIndex(), is(completeData.length));
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index 52f5f86b081..e438f3905c6 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExe
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
@@ -44,6 +45,7 @@ import org.opengauss.replication.PGReplicationStream;
 
 import java.nio.ByteBuffer;
 import java.sql.SQLException;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -64,7 +66,7 @@ public final class OpenGaussWALDumper extends 
AbstractLifecycleExecutor implemen
     
     private final boolean decodeWithTX;
     
-    private final List<AbstractRowEvent> rowEvents = new LinkedList<>();
+    private List<AbstractRowEvent> rowEvents = new LinkedList<>();
     
     public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final 
IngestPosition position,
                               final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader) {
@@ -115,29 +117,31 @@ public final class OpenGaussWALDumper extends 
AbstractLifecycleExecutor implemen
     }
     
     private void processEventWithTX(final AbstractWALEvent event) {
-        if (event instanceof AbstractRowEvent) {
-            rowEvents.add((AbstractRowEvent) event);
+        if (event instanceof BeginTXEvent) {
             return;
         }
-        if (event instanceof BeginTXEvent) {
-            rowEvents.clear();
+        if (event instanceof AbstractRowEvent) {
+            rowEvents.add((AbstractRowEvent) event);
             return;
         }
         if (event instanceof CommitTXEvent) {
             Long csn = ((CommitTXEvent) event).getCsn();
+            List<Record> records = new LinkedList<>();
             for (AbstractRowEvent each : rowEvents) {
                 each.setCsn(csn);
-                channel.pushRecord(walEventConverter.convert(each));
+                records.add(walEventConverter.convert(each));
             }
+            records.add(walEventConverter.convert(event));
+            channel.pushRecords(records);
+            rowEvents = new LinkedList<>();
         }
-        channel.pushRecord(walEventConverter.convert(event));
     }
     
     private void processEventIgnoreTX(final AbstractWALEvent event) {
         if (event instanceof BeginTXEvent) {
             return;
         }
-        channel.pushRecord(walEventConverter.convert(event));
+        
channel.pushRecords(Collections.singletonList(walEventConverter.convert(event)));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 88d8b70caa5..d5da268daf1 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExe
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
@@ -33,7 +34,10 @@ import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.Deco
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLTimestampUtils;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.TestDecodingPlugin;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.postgresql.jdbc.PgConnection;
@@ -42,6 +46,10 @@ import org.postgresql.replication.PGReplicationStream;
 import java.nio.ByteBuffer;
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * PostgreSQL WAL dumper.
@@ -58,6 +66,10 @@ public final class PostgreSQLWALDumper extends 
AbstractLifecycleExecutor impleme
     
     private final PostgreSQLLogicalReplication logicalReplication;
     
+    private final boolean decodeWithTX;
+    
+    private List<AbstractRowEvent> rowEvents = new LinkedList<>();
+    
     public PostgreSQLWALDumper(final DumperConfiguration dumperConfig, final 
IngestPosition position,
                                final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader) {
         
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
@@ -67,6 +79,7 @@ public final class PostgreSQLWALDumper extends 
AbstractLifecycleExecutor impleme
         this.channel = channel;
         walEventConverter = new WALEventConverter(dumperConfig, 
metaDataLoader);
         logicalReplication = new PostgreSQLLogicalReplication();
+        this.decodeWithTX = dumperConfig.isDecodeWithTX();
     }
     
     @SneakyThrows(InterruptedException.class)
@@ -86,13 +99,43 @@ public final class PostgreSQLWALDumper extends 
AbstractLifecycleExecutor impleme
                     continue;
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new 
PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
-                channel.pushRecord(walEventConverter.convert(event));
+                if (decodeWithTX) {
+                    processEventWithTX(event);
+                } else {
+                    processEventIgnoreTX(event);
+                }
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
         }
     }
     
+    private void processEventWithTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            rowEvents = new ArrayList<>();
+            return;
+        }
+        if (event instanceof AbstractRowEvent) {
+            rowEvents.add((AbstractRowEvent) event);
+            return;
+        }
+        if (event instanceof CommitTXEvent) {
+            List<Record> records = new LinkedList<>();
+            for (AbstractWALEvent each : rowEvents) {
+                records.add(walEventConverter.convert(each));
+            }
+            records.add(walEventConverter.convert(event));
+            channel.pushRecords(records);
+        }
+    }
+    
+    private void processEventIgnoreTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            return;
+        }
+        
channel.pushRecords(Collections.singletonList(walEventConverter.convert(event)));
+    }
+    
     @Override
     protected void doStop() {
     }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index 330c60e7a05..40729839477 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -23,6 +23,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -45,7 +47,14 @@ public final class TestDecodingPlugin implements 
DecodingPlugin {
     
     @Override
     public AbstractWALEvent decode(final ByteBuffer data, final 
BaseLogSequenceNumber logSequenceNumber) {
-        AbstractWALEvent result = "table".equals(readEventType(data)) ? 
readTableEvent(data) : new PlaceholderEvent();
+        String type = readEventType(data);
+        if (type.startsWith("BEGIN")) {
+            return new BeginTXEvent(Long.parseLong(readNextSegment(data)));
+        }
+        if (type.startsWith("COMMIT")) {
+            return new CommitTXEvent(Long.parseLong(readNextSegment(data)), 
null);
+        }
+        AbstractWALEvent result = "table".equals(type) ? readTableEvent(data) 
: new PlaceholderEvent();
         result.setLogSequenceNumber(logSequenceNumber);
         return result;
     }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 36689a5d87d..c823fc41844 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
@@ -76,14 +76,14 @@ class PostgreSQLWALDumperTest {
     
     private PostgreSQLWALDumper walDumper;
     
-    private MultiplexMemoryPipelineChannel channel;
+    private SimpleMemoryPipelineChannel channel;
     
     private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
     @BeforeEach
     void setUp() {
         position = new WALPosition(new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
-        channel = new MultiplexMemoryPipelineChannel(1, 10000, new 
EmptyAckCallback());
+        channel = new SimpleMemoryPipelineChannel(10000, new 
EmptyAckCallback());
         String jdbcUrl = 
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL";
         String username = "root";
         String password = "root";
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
index 1f190e36cbc..a02398e7e49 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
@@ -154,7 +155,7 @@ class DataSourceImporterTest {
     private List<Record> mockRecords(final DataRecord dataRecord) {
         List<Record> result = new LinkedList<>();
         result.add(dataRecord);
-        result.add(new FinishedRecord(new PlaceholderPosition()));
+        result.add(new FinishedRecord(new FinishedPosition()));
         return result;
     }
     

Reply via email to