This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5eebfdb0de4 Add CDC server push data record implementation and 
refactor pipeline importer (#23168)
5eebfdb0de4 is described below

commit 5eebfdb0de4091e2dbcb8326bed976bceaec4763
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Jan 2 22:26:25 2023 +0800

    Add CDC server push data record implementation and refactor pipeline 
importer (#23168)
    
    * Add CDC data record result implementation and refactor pipeline importer
    
    * Fix codestyle and improved
    
    * Add blob value
    
    * Fix codestyle
---
 .../api/config/ingest/DumperConfiguration.java     |   2 +
 .../pipeline/spi/importer/ImporterCreator.java     |   4 +-
 .../data/pipeline/spi/importer/ImporterType.java}  |  23 +--
 .../data/pipeline/cdc/client/CDCClient.java        |   2 +-
 .../client/handler/SubscriptionRequestHandler.java |  18 +-
 .../client/parameter/StartCDCClientParameter.java  |   2 +
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  34 +---
 .../api/pojo/CreateSubscriptionJobParameter.java   |   2 +
 .../cdc/config/job/CDCJobConfiguration.java        |   2 +
 .../cdc/context/job/CDCJobItemContext.java         |  15 +-
 .../data/pipeline/cdc/core/ack/CDCAckHolder.java   |  87 ++++++++++
 .../ack/CDCAckPosition.java}                       |  30 ++--
 .../pipeline/cdc/core/importer/CDCImporter.java    |  71 +++++++-
 .../cdc/core/importer/CDCImporterCreator.java      |   5 +-
 .../importer/connector/CDCImporterConnector.java   | 189 ++++++++++++++++++++-
 .../generator/DataRecordComparatorGenerator.java   |  44 +++++
 .../data/pipeline/cdc/util/CDCDataRecordUtil.java  | 110 ++++++++++++
 .../pipeline/cdc/util/ColumnValueConvertUtil.java  | 161 ++++++++++++++++++
 .../cdc/util/DataRecordResultConvertUtil.java      |  64 +++++++
 .../cdc/yaml/job/YamlCDCJobConfiguration.java      |   2 +
 .../yaml/job/YamlCDCJobConfigurationSwapper.java   |   3 +-
 .../pipeline/cdc/core/ack/CDCAckHolderTest.java    |  64 +++++++
 .../cdc/core/importer/CDCImporterCreatorTest.java  |   7 +-
 .../DataRecordComparatorGeneratorTest.java         |  55 ++++++
 .../pipeline/cdc/util/CDCDataRecordUtilTest.java   |  71 ++++++++
 .../cdc/util/ColumnValueConvertUtilTest.java       | 100 +++++++++++
 .../src/main/proto/CDCRequestProtocol.proto        |  16 +-
 .../src/main/proto/CDCResponseProtocol.proto       |  19 ++-
 .../core/importer/DataSourceImporterCreator.java   |   3 +-
 .../data/pipeline/core/record/RecordUtil.java      |  19 +++
 .../data/pipeline/core/task/IncrementalTask.java   |   4 +-
 .../data/pipeline/core/task/InventoryTask.java     |  20 +--
 .../OpenGaussIncrementalDumperCreator.java         |   2 +-
 .../opengauss/ingest/OpenGaussWALDumper.java       |   4 +-
 .../backend/handler/cdc/CDCBackendHandler.java     |  61 +++++--
 .../frontend/netty/CDCChannelInboundHandler.java   |  32 +++-
 .../core/fixture/FixtureImporterCreator.java       |   3 +-
 .../data/pipeline/core/record/RecordUtilTest.java  |  21 ++-
 38 files changed, 1236 insertions(+), 135 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index c21e22f3343..145828c2052 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -50,6 +50,8 @@ public class DumperConfiguration {
     
     private TableNameSchemaNameMapping tableNameSchemaNameMapping;
     
+    private boolean decodeWithTX;
+    
     /**
      * Get logic table name.
      *
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
index 38d2e8ba275..2e92ef36273 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
@@ -38,7 +38,9 @@ public interface ImporterCreator extends TypedSPI {
      * @param importerConnector import connector
      * @param channel channel
      * @param jobProgressListener job progress listener
+     * @param importerType importer type
      * @return importer
      */
-    Importer createImporter(ImporterConfiguration importerConfig, 
ImporterConnector importerConnector, PipelineChannel channel, 
PipelineJobProgressListener jobProgressListener);
+    Importer createImporter(ImporterConfiguration importerConfig, 
ImporterConnector importerConnector, PipelineChannel channel, 
PipelineJobProgressListener jobProgressListener,
+                            ImporterType importerType);
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterType.java
similarity index 59%
copy from 
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
copy to 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterType.java
index ba7a7e40b2c..7354da9e80c 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterType.java
@@ -15,27 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector;
-
-import io.netty.channel.Channel;
-import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+package org.apache.shardingsphere.data.pipeline.spi.importer;
 
 /**
- * CDC importer connector.
+ * Importer type.
  */
-@RequiredArgsConstructor
-public final class CDCImporterConnector implements ImporterConnector {
-    
-    private final Channel channel;
+public enum ImporterType {
     
-    @Override
-    public Object getConnector() {
-        return channel;
-    }
+    INVENTORY,
     
-    @Override
-    public String getType() {
-        return "CDC";
-    }
+    INCREMENTAL
 }
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index 014fa995453..4f2acf1fd90 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -90,7 +90,7 @@ public final class CDCClient {
                         channel.pipeline().addLast(new ProtobufEncoder());
                         channel.pipeline().addLast(new 
LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
                         channel.pipeline().addLast(new 
SubscriptionRequestHandler(parameter.getDatabase(), 
parameter.getSubscriptionName(), parameter.getSubscribeTables(),
-                                parameter.getSubscriptionMode()));
+                                parameter.getSubscriptionMode(), 
parameter.isIncrementalGlobalOrderly()));
                     }
                 });
         ChannelFuture future = bootstrap.connect(address, port).sync();
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
index e1aa325a31d..17855103394 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnect
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
 import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
@@ -33,6 +34,8 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscr
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
 
 import java.util.List;
 
@@ -51,11 +54,13 @@ public final class SubscriptionRequestHandler extends 
ChannelInboundHandlerAdapt
     
     private final SubscriptionMode subscribeMode;
     
+    private final boolean incrementalGlobalOrderly;
+    
     @Override
     public void userEventTriggered(final ChannelHandlerContext ctx, final 
Object evt) {
         if (evt instanceof CreateSubscriptionEvent) {
             CreateSubscriptionRequest createSubscriptionRequest = 
CreateSubscriptionRequest.newBuilder().setDatabase(database).setSubscriptionMode(subscribeMode).setSubscriptionName(subscriptionName)
-                    .addAllTableNames(subscribeTables).build();
+                    
.addAllTableNames(subscribeTables).setIncrementalGlobalOrderly(incrementalGlobalOrderly).build();
             CDCRequest request = 
CDCRequest.newBuilder().setCreateSubscription(createSubscriptionRequest).setRequestId(RequestIdUtil.generateRequestId()).build();
             ctx.writeAndFlush(request);
         }
@@ -64,7 +69,7 @@ public final class SubscriptionRequestHandler extends 
ChannelInboundHandlerAdapt
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
         CDCResponse response = (CDCResponse) msg;
-        if (response.getStatus() == Status.FAILED) {
+        if (response.getStatus() != Status.SUCCEED) {
             log.error("received error response {}", msg);
         }
         ClientConnectionContext connectionContext = 
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
@@ -77,7 +82,7 @@ public final class SubscriptionRequestHandler extends 
ChannelInboundHandlerAdapt
         } else if (connectionContext.getStatus() == 
ClientConnectionStatus.CREATING_SUBSCRIPTION) {
             startSubscription(response, connectionContext);
         } else {
-            subscribeDataRecords(ctx);
+            subscribeDataRecords(ctx, response.getDataRecordResult());
         }
     }
     
@@ -94,8 +99,11 @@ public final class SubscriptionRequestHandler extends 
ChannelInboundHandlerAdapt
         connectionContext.setStatus(ClientConnectionStatus.SUBSCRIBING);
     }
     
-    private void subscribeDataRecords(final ChannelHandlerContext ctx) {
-        // TODO to be implemented
+    private void subscribeDataRecords(final ChannelHandlerContext ctx, final 
DataRecordResult result) {
+        List<Record> recordsList = result.getRecordsList();
+        log.debug("received records {}", recordsList);
+        // TODO data needs to be processed, such as writing to a database
+        
ctx.channel().writeAndFlush(CDCRequest.newBuilder().setAckRequest(AckRequest.newBuilder().setAckId(result.getAckId()).build()).build());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
index f18bcbd6162..90d0ab31c6e 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
@@ -46,4 +46,6 @@ public final class StartCDCClientParameter {
     private String subscriptionName;
     
     private SubscriptionMode subscriptionMode = SubscriptionMode.INCREMENTAL;
+    
+    private boolean incrementalGlobalOrderly;
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index bc52ce5bd3b..336a3cc75c2 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -28,7 +28,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -36,9 +35,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
@@ -84,7 +81,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -112,6 +108,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         yamlJobConfig.setTableNames(event.getSubscribeTableNames());
         yamlJobConfig.setSubscriptionName(event.getSubscriptionName());
         yamlJobConfig.setSubscriptionMode(event.getSubscriptionMode());
+        yamlJobConfig.setDecodeWithTX(event.isDecodeWithTX());
         ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(event.getDatabase());
         
yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
         List<JobDataNodeLine> jobDataNodeLines = 
JobDataNodeLineConvertUtil.convertDataNodesToLines(event.getDataNodesMap());
@@ -182,21 +179,22 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(Collections.emptyMap());
         String dataSourceName = 
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
         StandardPipelineDataSourceConfiguration actualDataSourceConfiguration 
= 
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
-        DumperConfiguration dumperConfig = 
buildDumperConfiguration(jobConfig.getJobId(), dataSourceName, 
actualDataSourceConfiguration, tableNameMap, tableNameSchemaNameMapping);
+        DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig, 
dataSourceName, actualDataSourceConfiguration, tableNameMap, 
tableNameSchemaNameMapping);
         ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
jobConfig.getTableNames(), tableNameSchemaNameMapping);
         CDCTaskConfiguration result = new CDCTaskConfiguration(dumperConfig, 
importerConfig);
         log.debug("buildTaskConfiguration, result={}", result);
         return result;
     }
     
-    private static DumperConfiguration buildDumperConfiguration(final String 
jobId, final String dataSourceName, final PipelineDataSourceConfiguration 
sourceDataSourceConfig,
+    private static DumperConfiguration buildDumperConfiguration(final 
CDCJobConfiguration jobConfig, final String dataSourceName, final 
PipelineDataSourceConfiguration sourceDataSourceConfig,
                                                                 final 
Map<ActualTableName, LogicTableName> tableNameMap, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
         DumperConfiguration result = new DumperConfiguration();
-        result.setJobId(jobId);
+        result.setJobId(jobConfig.getJobId());
         result.setDataSourceName(dataSourceName);
         result.setDataSourceConfig(sourceDataSourceConfig);
         result.setTableNameMap(tableNameMap);
         result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
+        result.setDecodeWithTX(jobConfig.isDecodeWithTX());
         return result;
     }
     
@@ -232,31 +230,15 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         return new 
YamlCDCJobConfigurationSwapper().swapToYamlConfiguration((CDCJobConfiguration) 
jobConfig);
     }
     
-    @Override
-    public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        // TODO to be implemented
-    }
-    
-    @Override
-    public Optional<InventoryIncrementalJobItemProgress> 
getJobItemProgress(final String jobId, final int shardingItem) {
-        // TODO to be implemented
-        return Optional.empty();
-    }
-    
-    @Override
-    public void updateJobItemStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
-        // TODO to be implemented
-    }
-    
     @Override
     protected PipelineJobInfo getJobInfo(final String jobId) {
-        // TODO to be implemented
-        return null;
+        throw new UnsupportedOperationException();
     }
     
     @Override
     public void rollback(final String jobId) throws SQLException {
-        // TODO to be implemented
+        stop(jobId);
+        dropJob(jobId);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
index 02ddd7cb2f6..79c30206e54 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
@@ -40,4 +40,6 @@ public final class CreateSubscriptionJobParameter {
     private final String subscriptionMode;
     
     private final Map<String, List<DataNode>> dataNodesMap;
+    
+    private final boolean decodeWithTX;
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index de4358df453..d4c628ac874 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -50,6 +50,8 @@ public final class CDCJobConfiguration implements 
PipelineJobConfiguration {
     
     private final List<JobDataNodeLine> jobShardingDataNodes;
     
+    private final boolean decodeWithTX;
+    
     private final int concurrency;
     
     private final int retryTimes;
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
index c2702337ec6..8d63276264e 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
@@ -33,6 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguratio
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -40,6 +41,7 @@ import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterCo
 
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * CDC job item context.
@@ -72,6 +74,10 @@ public final class CDCJobItemContext implements 
InventoryIncrementalJobItemConte
     
     private final Collection<IncrementalTask> incrementalTasks = new 
LinkedList<>();
     
+    private final AtomicLong processedRecordsCount = new AtomicLong(0);
+    
+    private final AtomicLong inventoryRecordsCount = new AtomicLong(0);
+    
     private final LazyInitializer<PipelineDataSourceWrapper> 
sourceDataSourceLazyInitializer = new 
LazyInitializer<PipelineDataSourceWrapper>() {
         
         @Override
@@ -100,7 +106,8 @@ public final class CDCJobItemContext implements 
InventoryIncrementalJobItemConte
     
     @Override
     public void onProgressUpdated(final PipelineJobProgressUpdatedParameter 
param) {
-        // TODO to be implemented
+        processedRecordsCount.addAndGet(param.getProcessedRecordsCount());
+        PipelineJobProgressPersistService.notifyPersist(jobConfig.getJobId(), 
shardingItem);
     }
     
     /**
@@ -126,16 +133,16 @@ public final class CDCJobItemContext implements 
InventoryIncrementalJobItemConte
     
     @Override
     public long getProcessedRecordsCount() {
-        throw new UnsupportedOperationException();
+        return processedRecordsCount.get();
     }
     
     @Override
     public void updateInventoryRecordsCount(final long recordsCount) {
-        // TODO to be implemented
+        inventoryRecordsCount.addAndGet(recordsCount);
     }
     
     @Override
     public long getInventoryRecordsCount() {
-        return 0;
+        return inventoryRecordsCount.get();
     }
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
new file mode 100644
index 00000000000..134027c25c5
--- /dev/null
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * CDC ack holder.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CDCAckHolder {
+    
+    private static final CDCAckHolder INSTANCE = new CDCAckHolder();
+    
+    private final Map<String, Map<CDCImporter, CDCAckPosition>> 
ackIdImporterMap = new ConcurrentHashMap<>();
+    
+    /**
+     * the ack of CDC.
+     *
+     * @param ackId ack id
+     */
+    public void ack(final String ackId) {
+        Map<CDCImporter, CDCAckPosition> importerDataRecordMap = 
ackIdImporterMap.remove(ackId);
+        if (null != importerDataRecordMap) {
+            importerDataRecordMap.forEach(CDCImporter::ackWithLastDataRecord);
+        }
+    }
+    
+    /**
+     * Bind ack id.
+     *
+     * @param importerDataRecordMap import data record map
+     * @return ack id
+     */
+    public String bindAckIdWithPosition(final Map<CDCImporter, CDCAckPosition> 
importerDataRecordMap) {
+        String result = generateAckId();
+        // TODO it's might need to persist to registry center in cluster mode.
+        ackIdImporterMap.put(result, importerDataRecordMap);
+        return result;
+    }
+    
+    private String generateAckId() {
+        return "ACK-" + UUID.randomUUID();
+    }
+    
+    /**
+     * Clean up.
+     *
+     * @param cdcImporter CDC importer
+     */
+    public void cleanUp(final CDCImporter cdcImporter) {
+        if (ackIdImporterMap.isEmpty()) {
+            return;
+        }
+        ackIdImporterMap.entrySet().removeIf(entry -> 
entry.getValue().containsKey(cdcImporter));
+    }
+    
+    /**
+     * Get instance.
+     *
+     * @return CDC ack holder
+     */
+    public static CDCAckHolder getInstance() {
+        return INSTANCE;
+    }
+}
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
similarity index 60%
copy from 
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
copy to 
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
index 02ddd7cb2f6..469aa3870bb 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
@@ -15,29 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.api.pojo;
+package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-
-import java.util.List;
-import java.util.Map;
+import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 
 /**
- * Create subscription job parameter.
+ * CDC ack position.
  */
-@RequiredArgsConstructor
 @Getter
-public final class CreateSubscriptionJobParameter {
-    
-    private final String database;
+@AllArgsConstructor
+public final class CDCAckPosition {
     
-    private final List<String> subscribeTableNames;
+    @Setter
+    private Record lastRecord;
     
-    private final String subscriptionName;
+    @Setter
+    private int dataRecordCount;
     
-    private final String subscriptionMode;
+    private final long createTimeMills;
     
-    private final Map<String, List<DataNode>> dataNodesMap;
+    public CDCAckPosition(final Record lastRecord, final int dataRecordCount) {
+        this(lastRecord, dataRecordCount, System.currentTimeMillis());
+    }
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index e9fd10e3ac4..5b7f7c08272 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -19,17 +19,33 @@ package 
org.apache.shardingsphere.data.pipeline.cdc.core.importer;
 
 import lombok.AccessLevel;
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
+import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
 /**
  * CDC importer.
  */
+@Slf4j
 public final class CDCImporter extends AbstractLifecycleExecutor implements 
Importer {
     
     @Getter(AccessLevel.PROTECTED)
@@ -37,24 +53,71 @@ public final class CDCImporter extends 
AbstractLifecycleExecutor implements Impo
     
     private final PipelineChannel channel;
     
+    private final CDCImporterConnector importerConnector;
+    
     private final PipelineJobProgressListener jobProgressListener;
     
+    @Getter
+    private final ImporterType importerType;
+    
     private final JobRateLimitAlgorithm rateLimitAlgorithm;
     
-    public CDCImporter(final ImporterConfiguration importerConfig, final 
ImporterConnector importerConnector, final PipelineChannel channel, final 
PipelineJobProgressListener jobProgressListener) {
+    public CDCImporter(final ImporterConfiguration importerConfig, final 
ImporterConnector importerConnector, final PipelineChannel channel, final 
PipelineJobProgressListener jobProgressListener,
+                       final ImporterType importerType) {
         this.importerConfig = importerConfig;
-        rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
+        rateLimitAlgorithm = null == importerConfig ? null : 
importerConfig.getRateLimitAlgorithm();
         this.channel = channel;
+        this.importerConnector = (CDCImporterConnector) importerConnector;
         this.jobProgressListener = jobProgressListener;
+        this.importerType = importerType;
     }
     
     @Override
     protected void runBlocking() {
-        // TODO to be implemented
+        int batchSize = importerConfig.getBatchSize();
+        if (ImporterType.INCREMENTAL == importerType) {
+            importerConnector.sendIncrementalStartEvent(this, batchSize);
+        }
+        while (isRunning()) {
+            List<Record> records = channel.fetchRecords(batchSize, 3);
+            if (null != records && !records.isEmpty()) {
+                List<Record> recordList = records.stream().filter(each -> 
!(each instanceof PlaceholderRecord)).collect(Collectors.toList());
+                try {
+                    processDataRecords(recordList);
+                } catch (final SQLException ex) {
+                    log.error("process data records failed", ex);
+                    throw new RuntimeException(ex);
+                }
+                if (FinishedRecord.class.equals(records.get(records.size() - 
1).getClass())) {
+                    break;
+                }
+            }
+        }
+    }
+    
+    private void processDataRecords(final List<Record> recordList) throws 
SQLException {
+        if (null == recordList || recordList.isEmpty()) {
+            return;
+        }
+        if (null != rateLimitAlgorithm) {
+            rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
+        }
+        importerConnector.write(recordList, this, importerType);
+    }
+    
+    /**
+     * Ack with last data record.
+     *
+     * @param cdcAckPosition cdc ack position
+     */
+    public void ackWithLastDataRecord(final CDCAckPosition cdcAckPosition) {
+        channel.ack(Collections.singletonList(cdcAckPosition.getLastRecord()));
+        jobProgressListener.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(cdcAckPosition.getDataRecordCount()));
     }
     
     @Override
     protected void doStop() {
-        // TODO to be implemented
+        importerConnector.clean();
+        CDCAckHolder.getInstance().cleanUp(this);
     }
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
index d27023370aa..9f1034ef0eb 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 
 /**
@@ -31,8 +32,8 @@ public final class CDCImporterCreator implements 
ImporterCreator {
     
     @Override
     public Importer createImporter(final ImporterConfiguration importerConfig, 
final ImporterConnector importerConnector, final PipelineChannel channel,
-                                   final PipelineJobProgressListener 
jobProgressListener) {
-        return new CDCImporter(importerConfig, importerConnector, channel, 
jobProgressListener);
+                                   final PipelineJobProgressListener 
jobProgressListener, final ImporterType importerType) {
+        return new CDCImporter(importerConfig, importerConnector, channel, 
jobProgressListener, importerType);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
index ba7a7e40b2c..257f2324d71 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
@@ -18,24 +18,211 @@
 package org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector;
 
 import io.netty.channel.Channel;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
+import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataRecordUtil;
+import 
org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtil;
+import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 /**
  * CDC importer connector.
  */
-@RequiredArgsConstructor
+@Slf4j
 public final class CDCImporterConnector implements ImporterConnector {
     
+    private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
+    
+    private final Lock lock = new ReentrantLock();
+    
+    private final Condition condition = lock.newCondition();
+    
+    @Setter
+    private volatile boolean running = true;
+    
+    @Getter
+    private final String database;
+    
     private final Channel channel;
     
+    private final int jobShardingCount;
+    
+    private final Comparator<DataRecord> dataRecordComparator;
+    
+    private final Map<String, String> tableNameSchemaMap = new HashMap<>();
+    
+    private final Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap 
= new ConcurrentHashMap<>();
+    
+    private final AtomicInteger runningIncrementalTaskCount = new 
AtomicInteger(0);
+    
+    private Thread incrementalImporterTask;
+    
+    public CDCImporterConnector(final Channel channel, final String database, 
final int jobShardingCount, final List<String> tableNames, final 
Comparator<DataRecord> dataRecordComparator) {
+        this.channel = channel;
+        this.database = database;
+        this.jobShardingCount = jobShardingCount;
+        tableNames.stream().filter(each -> each.contains(".")).forEach(each -> 
{
+            String[] split = each.split("\\.");
+            tableNameSchemaMap.put(split[0], split[1]);
+        });
+        this.dataRecordComparator = dataRecordComparator;
+    }
+    
     @Override
     public Object getConnector() {
         return channel;
     }
     
+    /**
+     * Write data record into channel.
+     *
+     * @param recordList data records
+     * @param cdcImporter cdc importer
+     * @param importerType importer type
+     */
+    public void write(final List<Record> recordList, final CDCImporter 
cdcImporter, final ImporterType importerType) {
+        if (recordList.isEmpty()) {
+            return;
+        }
+        if (ImporterType.INVENTORY == importerType || null == 
dataRecordComparator) {
+            Map<CDCImporter, CDCAckPosition> importerDataRecordMap = new 
HashMap<>();
+            int dataRecordCount = (int) recordList.stream().filter(each -> 
each instanceof DataRecord).count();
+            Record lastRecord = recordList.get(recordList.size() - 1);
+            if (lastRecord instanceof FinishedRecord && 0 == dataRecordCount) {
+                cdcImporter.ackWithLastDataRecord(new 
CDCAckPosition(lastRecord, 0));
+                return;
+            }
+            importerDataRecordMap.put(cdcImporter, new 
CDCAckPosition(RecordUtil.getLastNormalRecord(recordList), dataRecordCount));
+            writeImmediately(recordList, importerDataRecordMap);
+        } else if (ImporterType.INCREMENTAL == importerType) {
+            writeIntoQueue(recordList, cdcImporter);
+        }
+    }
+    
+    private void writeImmediately(final List<? extends Record> recordList, 
final Map<CDCImporter, CDCAckPosition> importerDataRecordMap) {
+        while (!channel.isWritable() && channel.isActive() && running) {
+            doAwait();
+        }
+        if (!channel.isActive() || !running) {
+            return;
+        }
+        List<DataRecordResult.Record> records = new LinkedList<>();
+        for (Record each : recordList) {
+            if (!(each instanceof DataRecord)) {
+                continue;
+            }
+            DataRecord dataRecord = (DataRecord) each;
+            
records.add(DataRecordResultConvertUtil.convertDataRecordToRecord(database, 
tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
+        }
+        String ackId = 
CDCAckHolder.getInstance().bindAckIdWithPosition(importerDataRecordMap);
+        DataRecordResult dataRecordResult = 
DataRecordResult.newBuilder().addAllRecords(records).setAckId(ackId).build();
+        
channel.writeAndFlush(CDCResponseGenerator.succeedBuilder("").setDataRecordResult(dataRecordResult).build());
+    }
+    
+    private void doAwait() {
+        lock.lock();
+        try {
+            condition.await(DEFAULT_TIMEOUT_MILLISECONDS, 
TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException ignored) {
+        } finally {
+            lock.unlock();
+        }
+    }
+    
+    @SneakyThrows(InterruptedException.class)
+    private void writeIntoQueue(final List<Record> dataRecords, final 
CDCImporter cdcImporter) {
+        BlockingQueue<Record> blockingQueue = 
incrementalRecordMap.get(cdcImporter);
+        if (null == blockingQueue) {
+            log.warn("not find the queue to write");
+            return;
+        }
+        for (Record each : dataRecords) {
+            blockingQueue.put(each);
+        }
+    }
+    
+    /**
+     * Send finished record event.
+     *
+     * @param cdcImporter cdc importer
+     * @param batchSize batch size
+     */
+    public void sendIncrementalStartEvent(final CDCImporter cdcImporter, final 
int batchSize) {
+        incrementalRecordMap.computeIfAbsent(cdcImporter, ignored -> new 
ArrayBlockingQueue<>(batchSize));
+        int count = runningIncrementalTaskCount.incrementAndGet();
+        if (count < jobShardingCount || null == dataRecordComparator) {
+            return;
+        }
+        log.debug("start CDC incremental importer");
+        if (null == incrementalImporterTask) {
+            incrementalImporterTask = new Thread(new 
CDCIncrementalImporterTask(batchSize));
+            incrementalImporterTask.start();
+        }
+    }
+    
+    /**
+     * Clean CDC importer connector.
+     */
+    public void clean() {
+        running = false;
+        incrementalRecordMap.clear();
+    }
+    
     @Override
     public String getType() {
         return "CDC";
     }
+    
+    @RequiredArgsConstructor
+    private final class CDCIncrementalImporterTask implements Runnable {
+        
+        private final int batchSize;
+        
+        @Override
+        public void run() {
+            while (running) {
+                Map<CDCImporter, CDCAckPosition> cdcAckPositionMap = new 
HashMap<>();
+                List<DataRecord> dataRecords = new LinkedList<>();
+                for (int i = 0; i < batchSize; i++) {
+                    DataRecord minimumDataRecord = 
CDCDataRecordUtil.findMinimumDataRecordAndSavePosition(incrementalRecordMap, 
dataRecordComparator, cdcAckPositionMap);
+                    if (null == minimumDataRecord) {
+                        break;
+                    }
+                    dataRecords.add(minimumDataRecord);
+                }
+                if (dataRecords.isEmpty()) {
+                    ThreadUtil.sleep(200, TimeUnit.MILLISECONDS);
+                } else {
+                    writeImmediately(dataRecords, cdcAckPositionMap);
+                }
+            }
+        }
+    }
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGenerator.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGenerator.java
new file mode 100644
index 00000000000..7c467e0b910
--- /dev/null
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGenerator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.generator;
+
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+
+import java.util.Comparator;
+
+/**
+ * Data record comparator generator.
+ */
+public final class DataRecordComparatorGenerator {
+    
+    /**
+     * Generator comparator.
+     *
+     * @param databaseType database type
+     * @return data record comparator
+     */
+    public static Comparator<DataRecord> generatorIncrementalComparator(final 
DatabaseType databaseType) {
+        if (databaseType instanceof OpenGaussDatabaseType) {
+            return Comparator.comparing(DataRecord::getCsn, 
Comparator.nullsFirst(Comparator.naturalOrder()));
+        }
+        // TODO MySQL and PostgreSQL not support now
+        return null;
+    }
+}
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
new file mode 100644
index 00000000000..295d03cabfe
--- /dev/null
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * CDC data record util.
+ */
+public final class CDCDataRecordUtil {
+    
+    /**
+     * Find minimum data record and save position.
+     *
+     * @param incrementalRecordMap CDC ack position map.
+     * @param dataRecordComparator CDC ack position map.
+     * @param cdcAckPositionMap CDC ack position map.
+     * @return minimum data record
+     */
+    public static DataRecord findMinimumDataRecordAndSavePosition(final 
Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap, final 
Comparator<DataRecord> dataRecordComparator,
+                                                                  final 
Map<CDCImporter, CDCAckPosition> cdcAckPositionMap) {
+        if (null == dataRecordComparator) {
+            return 
findMinimumDataRecordWithoutComparator(incrementalRecordMap, cdcAckPositionMap);
+        } else {
+            return findMinimumDataRecordWithComparator(incrementalRecordMap, 
cdcAckPositionMap, dataRecordComparator);
+        }
+    }
+    
+    private static DataRecord findMinimumDataRecordWithoutComparator(final 
Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap, final 
Map<CDCImporter, CDCAckPosition> cdcAckPositionMap) {
+        for (Entry<CDCImporter, BlockingQueue<Record>> entry : 
incrementalRecordMap.entrySet()) {
+            Record record = entry.getValue().poll();
+            if (!(record instanceof DataRecord)) {
+                continue;
+            }
+            saveAckPosition(cdcAckPositionMap, entry.getKey(), record);
+            return (DataRecord) record;
+        }
+        return null;
+    }
+    
+    private static void saveAckPosition(final Map<CDCImporter, CDCAckPosition> 
cdcAckPositionMap, final CDCImporter cdcImporter, final Record record) {
+        CDCAckPosition cdcAckPosition = cdcAckPositionMap.get(cdcImporter);
+        if (null == cdcAckPosition) {
+            cdcAckPositionMap.put(cdcImporter, new CDCAckPosition(record, 1));
+        } else {
+            cdcAckPosition.setLastRecord(record);
+            
cdcAckPosition.setDataRecordCount(cdcAckPosition.getDataRecordCount());
+        }
+    }
+    
+    private static DataRecord findMinimumDataRecordWithComparator(final 
Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap, final 
Map<CDCImporter, CDCAckPosition> cdcAckPositionMap,
+                                                                  final 
Comparator<DataRecord> dataRecordComparator) {
+        Map<CDCImporter, DataRecord> waitSortedMap = new HashMap<>();
+        for (Entry<CDCImporter, BlockingQueue<Record>> entry : 
incrementalRecordMap.entrySet()) {
+            Record peek = entry.getValue().peek();
+            if (null == peek) {
+                continue;
+            }
+            if (peek instanceof DataRecord) {
+                waitSortedMap.put(entry.getKey(), (DataRecord) peek);
+            }
+        }
+        if (waitSortedMap.isEmpty()) {
+            return null;
+        }
+        DataRecord minRecord = null;
+        CDCImporter belongImporter = null;
+        for (Entry<CDCImporter, DataRecord> entry : waitSortedMap.entrySet()) {
+            if (null == minRecord) {
+                minRecord = entry.getValue();
+                belongImporter = entry.getKey();
+                continue;
+            }
+            if (dataRecordComparator.compare(minRecord, entry.getValue()) > 0) 
{
+                minRecord = entry.getValue();
+                belongImporter = entry.getKey();
+            }
+        }
+        if (null == minRecord) {
+            return null;
+        }
+        incrementalRecordMap.get(belongImporter).poll();
+        saveAckPosition(cdcAckPositionMap, belongImporter, minRecord);
+        return minRecord;
+    }
+}
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java
new file mode 100644
index 00000000000..c62c9327748
--- /dev/null
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import com.google.gson.Gson;
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.Struct;
+import com.google.protobuf.util.JsonFormat;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigDecimalValue;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigIntegerValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BlobValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ClobValue;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.LocalTimeValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.NullValue;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZonedDateTime;
+import java.util.Date;
+
+/**
+ * Column value convert util.
+ */
+@Slf4j
+public final class ColumnValueConvertUtil {
+    
+    private static final Gson GSON = new Gson();
+    
+    /**
+     * Convert java object to protobuf message.
+     *
+     * @param object object
+     * @return protobuf message
+     */
+    public static Message convertToProtobufMessage(final Object object) {
+        if (null == object) {
+            return NullValue.newBuilder().build();
+        }
+        if (object instanceof Integer) {
+            return Int32Value.newBuilder().setValue((int) object).build();
+        }
+        if (object instanceof Short) {
+            return Int32Value.newBuilder().setValue(((Short) 
object).intValue()).build();
+        }
+        if (object instanceof Byte) {
+            return Int32Value.newBuilder().setValue(((Byte) 
object).intValue()).build();
+        }
+        if (object instanceof Long) {
+            return Int64Value.newBuilder().setValue((long) object).build();
+        }
+        if (object instanceof BigInteger) {
+            return 
BigIntegerValue.newBuilder().setValue(ByteString.copyFrom(((BigInteger) 
object).toByteArray())).build();
+        }
+        if (object instanceof Float) {
+            return FloatValue.newBuilder().setValue((float) object).build();
+        }
+        if (object instanceof Double) {
+            return DoubleValue.newBuilder().setValue((double) object).build();
+        }
+        if (object instanceof BigDecimal) {
+            return 
BigDecimalValue.newBuilder().setValue(object.toString()).build();
+        }
+        if (object instanceof String) {
+            return 
StringValue.newBuilder().setValue(object.toString()).build();
+        }
+        if (object instanceof Boolean) {
+            return BoolValue.newBuilder().setValue((boolean) object).build();
+        }
+        if (object instanceof byte[]) {
+            return 
BytesValue.newBuilder().setValue(ByteString.copyFrom((byte[]) object)).build();
+        }
+        if (object instanceof Date) {
+            return converToProtobufTimestamp((Date) object);
+        }
+        if (object instanceof LocalDateTime) {
+            return converToProtobufTimestamp(Timestamp.valueOf((LocalDateTime) 
object));
+        }
+        if (object instanceof LocalDate) {
+            return converToProtobufTimestamp(Timestamp.valueOf(((LocalDate) 
object).atStartOfDay()));
+        }
+        if (object instanceof LocalTime) {
+            LocalTime localTime = (LocalTime) object;
+            return 
LocalTimeValue.newBuilder().setValue(localTime.toString()).build();
+        }
+        if (object instanceof ZonedDateTime) {
+            return 
converToProtobufTimestamp(Timestamp.valueOf(((ZonedDateTime) 
object).toLocalDateTime()));
+        }
+        if (object instanceof Instant) {
+            Instant instant = (Instant) object;
+            return 
com.google.protobuf.Timestamp.newBuilder().setSeconds(instant.getEpochSecond()).setNanos(instant.getNano()).build();
+        }
+        if (object instanceof Clob) {
+            Clob clob = (Clob) object;
+            try {
+                return ClobValue.newBuilder().setValue(clob.getSubString(1, 
(int) clob.length())).build();
+            } catch (final SQLException ex) {
+                log.error("get clob length failed", ex);
+                throw new RuntimeException(ex);
+            }
+        }
+        if (object instanceof Blob) {
+            Blob blob = (Blob) object;
+            try {
+                return 
BlobValue.newBuilder().setValue(ByteString.copyFrom(blob.getBytes(1, (int) 
blob.length()))).build();
+            } catch (final SQLException ex) {
+                log.error("get blob bytes failed", ex);
+                throw new RuntimeException(ex);
+            }
+        }
+        return fromJson(GSON.toJson(object));
+    }
+    
+    private static com.google.protobuf.Timestamp 
converToProtobufTimestamp(final Date timestamp) {
+        long millis = timestamp.getTime();
+        return com.google.protobuf.Timestamp.newBuilder().setSeconds(millis / 
1000).setNanos((int) ((millis % 1000) * 1000000)).build();
+    }
+    
+    private static Message fromJson(final String json) {
+        Builder structBuilder = Struct.newBuilder();
+        try {
+            JsonFormat.parser().ignoringUnknownFields().merge(json, 
structBuilder);
+        } catch (final InvalidProtocolBufferException ex) {
+            throw new RuntimeException(ex);
+        }
+        return structBuilder.build();
+    }
+}
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java
new file mode 100644
index 00000000000..9108ed6af2e
--- /dev/null
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import com.google.common.base.Strings;
+import com.google.protobuf.Any;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.DataChangeType;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.TableMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Data record result convert util.
+ */
+public final class DataRecordResultConvertUtil {
+    
+    /**
+     * Convert data record to record.
+     *
+     * @param database database
+     * @param schema schema
+     * @param dataRecord data record
+     * @return record
+     */
+    public static Record convertDataRecordToRecord(final String database, 
final String schema, final DataRecord dataRecord) {
+        Map<String, Any> beforeMap = new HashMap<>();
+        Map<String, Any> afterMap = new HashMap<>();
+        for (Column column : dataRecord.getColumns()) {
+            beforeMap.put(column.getName(), 
Any.pack(ColumnValueConvertUtil.convertToProtobufMessage(column.getOldValue())));
+            afterMap.put(column.getName(), 
Any.pack(ColumnValueConvertUtil.convertToProtobufMessage(column.getValue())));
+        }
+        TableMetaData metaData = 
TableMetaData.newBuilder().setDatabase(database).setSchema(Strings.nullToEmpty(schema)).setTableName(dataRecord.getTableName()).build();
+        DataChangeType dataChangeType = DataChangeType.UNKNOWN;
+        if (IngestDataChangeType.INSERT.equals(dataRecord.getType())) {
+            dataChangeType = DataChangeType.INSERT;
+        } else if (IngestDataChangeType.UPDATE.equals(dataRecord.getType())) {
+            dataChangeType = DataChangeType.UPDATE;
+        } else if (IngestDataChangeType.DELETE.equals(dataRecord.getType())) {
+            dataChangeType = DataChangeType.DELETE;
+        }
+        return 
DataRecordResult.Record.newBuilder().setTableMetaData(metaData).putAllBefore(beforeMap).putAllAfter(afterMap).setDataChangeType(dataChangeType).build();
+    }
+}
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
index 28c38a814a7..9a02e7b853e 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
@@ -49,6 +49,8 @@ public final class YamlCDCJobConfiguration implements 
YamlPipelineJobConfigurati
     
     private List<String> jobShardingDataNodes;
     
+    private boolean decodeWithTX;
+    
     private int concurrency = 1;
     
     private int retryTimes;
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
index 106f9b9ab2f..14276bbed7b 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
@@ -48,6 +48,7 @@ public final class YamlCDCJobConfigurationSwapper implements 
YamlConfigurationSw
         result.setTablesFirstDataNodes(null == data.getTablesFirstDataNodes() 
? null : data.getTablesFirstDataNodes().marshal());
         List<String> jobShardingDataNodes = null == 
data.getJobShardingDataNodes() ? null : 
data.getJobShardingDataNodes().stream().map(JobDataNodeLine::marshal).collect(Collectors.toList());
         result.setJobShardingDataNodes(jobShardingDataNodes);
+        result.setDecodeWithTX(data.isDecodeWithTX());
         result.setConcurrency(data.getConcurrency());
         result.setRetryTimes(0);
         return result;
@@ -61,7 +62,7 @@ public final class YamlCDCJobConfigurationSwapper implements 
YamlConfigurationSw
         JobDataNodeLine tablesFirstDataNodes = null == 
yamlConfig.getTablesFirstDataNodes() ? null : 
JobDataNodeLine.unmarshal(yamlConfig.getTablesFirstDataNodes());
         return new CDCJobConfiguration(yamlConfig.getJobId(), 
yamlConfig.getDatabase(), yamlConfig.getTableNames(), 
yamlConfig.getSubscriptionName(), yamlConfig.getSubscriptionMode(),
                 yamlConfig.getSourceDatabaseType(), 
(ShardingSpherePipelineDataSourceConfiguration) 
dataSourceConfigSwapper.swapToObject(yamlConfig.getDataSourceConfiguration()), 
tablesFirstDataNodes,
-                jobShardingDataNodes, yamlConfig.getConcurrency(), 
yamlConfig.getRetryTimes());
+                jobShardingDataNodes, yamlConfig.isDecodeWithTX(), 
yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
     }
     
     /**
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
new file mode 100644
index 00000000000..bc15f22efe3
--- /dev/null
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
+
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.infra.util.reflection.ReflectionUtil;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public final class CDCAckHolderTest {
+    
+    @Test
+    public void assertBindAckIdWithPositionAndAck() {
+        CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
+        final Map<CDCImporter, CDCAckPosition> importerDataRecordMap = new 
HashMap<>();
+        CDCImporter cdcImporter = mock(CDCImporter.class);
+        importerDataRecordMap.put(cdcImporter, new CDCAckPosition(new 
FinishedRecord(new FinishedPosition()), 0));
+        Optional<Map<String, Map<CDCImporter, CDCAckPosition>>> 
ackIdImporterMap = ReflectionUtil.getFieldValue(cdcAckHolder, 
"ackIdImporterMap");
+        assertTrue(ackIdImporterMap.isPresent());
+        assertTrue(ackIdImporterMap.get().isEmpty());
+        String ackId = 
cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
+        assertThat(ackIdImporterMap.get().size(), is(1));
+        cdcAckHolder.ack(ackId);
+        assertTrue(ackIdImporterMap.get().isEmpty());
+    }
+    
+    @Test
+    public void assertCleanUpTimeoutAckId() {
+        CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
+        final Map<CDCImporter, CDCAckPosition> importerDataRecordMap = new 
HashMap<>();
+        CDCImporter cdcImporter = mock(CDCImporter.class);
+        importerDataRecordMap.put(cdcImporter, new CDCAckPosition(new 
FinishedRecord(new FinishedPosition()), 0, System.currentTimeMillis() - 60 * 
1000 * 10));
+        cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
+        cdcAckHolder.cleanUp(cdcImporter);
+        Optional<Map<String, Map<CDCImporter, CDCAckPosition>>> 
actualAckIdImporterMap = ReflectionUtil.getFieldValue(cdcAckHolder, 
"ackIdImporterMap");
+        assertTrue(actualAckIdImporterMap.isPresent());
+        assertTrue(actualAckIdImporterMap.get().isEmpty());
+    }
+}
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
index 811ed83d94c..bebcbaef4a0 100644
--- 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
 
+import io.netty.channel.Channel;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
@@ -26,8 +27,11 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.util.Collections;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
 
 @RunWith(MockitoJUnitRunner.class)
 public final class CDCImporterCreatorTest {
@@ -37,6 +41,7 @@ public final class CDCImporterCreatorTest {
     
     @Test
     public void assertCreateCDCImporter() {
-        
assertThat(TypedSPIRegistry.getRegisteredService(ImporterCreator.class, 
"CDC").createImporter(importerConfig, new CDCImporterConnector(null), null, 
null), instanceOf(CDCImporter.class));
+        CDCImporterConnector importerConnector = new 
CDCImporterConnector(mock(Channel.class), "test", 1, Collections.emptyList(), 
null);
+        
assertThat(TypedSPIRegistry.getRegisteredService(ImporterCreator.class, 
"CDC").createImporter(importerConfig, importerConnector, null, null, null), 
instanceOf(CDCImporter.class));
     }
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGeneratorTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGeneratorTest.java
new file mode 100644
index 00000000000..b7248148f23
--- /dev/null
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGeneratorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.generator;
+
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.junit.Test;
+
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
+
+public final class DataRecordComparatorGeneratorTest {
+    
+    @Test
+    public void assertGeneratorIncrementalComparator() {
+        Comparator<DataRecord> dataRecordComparator = 
DataRecordComparatorGenerator.generatorIncrementalComparator(new 
OpenGaussDatabaseType());
+        List<DataRecord> dataRecords = new LinkedList<>();
+        dataRecords.add(generateDataRecord(1L));
+        dataRecords.add(generateDataRecord(100L));
+        dataRecords.add(generateDataRecord(0L));
+        dataRecords.add(generateDataRecord(null));
+        dataRecords.sort(dataRecordComparator);
+        assertNull(dataRecords.get(0).getCsn());
+        assertThat(dataRecords.get(1).getCsn(), is(0L));
+        assertThat(dataRecords.get(2).getCsn(), is(1L));
+        assertThat(dataRecords.get(3).getCsn(), is(100L));
+    }
+    
+    private DataRecord generateDataRecord(final Long csn) {
+        DataRecord result = new DataRecord(new PlaceholderPosition(), 0);
+        result.setCsn(csn);
+        return result;
+    }
+}
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java
new file mode 100644
index 00000000000..658d8fcdfc3
--- /dev/null
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
+import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.junit.Test;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+public final class CDCDataRecordUtilTest {
+    
+    @Test
+    public void assertFindMinimumDataRecordAndSavePosition() throws 
InterruptedException {
+        final Map<CDCImporter, BlockingQueue<Record>> 
actualIncrementalRecordMap = new HashMap<>();
+        ArrayBlockingQueue<Record> queueFirst = new ArrayBlockingQueue<>(5);
+        queueFirst.put(generateDataRecord(0));
+        queueFirst.put(generateDataRecord(2));
+        queueFirst.put(generateDataRecord(4));
+        CDCImporter mockCdcImporterFirst = mock(CDCImporter.class);
+        actualIncrementalRecordMap.put(mockCdcImporterFirst, queueFirst);
+        ArrayBlockingQueue<Record> queueSecond = new ArrayBlockingQueue<>(5);
+        queueSecond.put(generateDataRecord(1));
+        queueSecond.put(generateDataRecord(3));
+        queueSecond.put(generateDataRecord(5));
+        CDCImporter mockCdcImporterSecond = mock(CDCImporter.class);
+        actualIncrementalRecordMap.put(mockCdcImporterSecond, queueSecond);
+        Comparator<DataRecord> dataRecordComparator = 
DataRecordComparatorGenerator.generatorIncrementalComparator(new 
OpenGaussDatabaseType());
+        final Map<CDCImporter, CDCAckPosition> cdcAckPositionMap = new 
HashMap<>();
+        for (long i = 0; i <= 5; i++) {
+            DataRecord minimumDataRecord = 
CDCDataRecordUtil.findMinimumDataRecordAndSavePosition(actualIncrementalRecordMap,
 dataRecordComparator, cdcAckPositionMap);
+            assertThat(minimumDataRecord.getCsn(), is(i));
+        }
+        
assertNull(CDCDataRecordUtil.findMinimumDataRecordAndSavePosition(actualIncrementalRecordMap,
 dataRecordComparator, cdcAckPositionMap));
+    }
+    
+    private DataRecord generateDataRecord(final long csn) {
+        DataRecord dataRecord = new DataRecord(new PlaceholderPosition(), 0);
+        dataRecord.setCsn(csn);
+        return dataRecord;
+    }
+}
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtilTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtilTest.java
new file mode 100644
index 00000000000..bf5dbf1266b
--- /dev/null
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtilTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.StringValue;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigDecimalValue;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigIntegerValue;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.LocalTimeValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.NullValue;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
+import java.util.Date;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertTrue;
+
+public final class ColumnValueConvertUtilTest {
+    
+    @Test
+    public void assertConvertToProtobufMessage() {
+        Message actualMessage = 
ColumnValueConvertUtil.convertToProtobufMessage(null);
+        assertTrue(actualMessage instanceof NullValue);
+        actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(1);
+        assertTrue(actualMessage instanceof Int32Value);
+        assertThat(((Int32Value) actualMessage).getValue(), is(1));
+        actualMessage = ColumnValueConvertUtil.convertToProtobufMessage((byte) 
1);
+        assertTrue(actualMessage instanceof Int32Value);
+        assertThat(((Int32Value) actualMessage).getValue(), is(1));
+        actualMessage = 
ColumnValueConvertUtil.convertToProtobufMessage((short) 1);
+        assertTrue(actualMessage instanceof Int32Value);
+        assertThat(((Int32Value) actualMessage).getValue(), is(1));
+        actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(1L);
+        assertTrue(actualMessage instanceof Int64Value);
+        assertThat(((Int64Value) actualMessage).getValue(), is(1L));
+        actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(new 
BigInteger("1234"));
+        assertTrue(actualMessage instanceof BigIntegerValue);
+        assertThat(new BigInteger(((BigIntegerValue) 
actualMessage).getValue().toByteArray()), is(new BigInteger("1234")));
+        actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(1.0F);
+        assertTrue(actualMessage instanceof FloatValue);
+        assertThat(((FloatValue) actualMessage).getValue(), is(1.0F));
+        actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(1.23);
+        assertTrue(actualMessage instanceof DoubleValue);
+        assertThat(((DoubleValue) actualMessage).getValue(), is(1.23));
+        actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(new 
BigDecimal("100"));
+        assertTrue(actualMessage instanceof BigDecimalValue);
+        assertThat(((BigDecimalValue) actualMessage).getValue(), is("100"));
+        actualMessage = 
ColumnValueConvertUtil.convertToProtobufMessage("abcd");
+        assertTrue(actualMessage instanceof StringValue);
+        assertThat(((StringValue) actualMessage).getValue(), is("abcd"));
+        actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(true);
+        assertTrue(actualMessage instanceof BoolValue);
+        assertTrue(((BoolValue) actualMessage).getValue());
+        Timestamp now = new Timestamp(System.currentTimeMillis());
+        long epochSecond = now.toInstant().getEpochSecond();
+        actualMessage = 
ColumnValueConvertUtil.convertToProtobufMessage(now.toLocalDateTime());
+        assertTrue(actualMessage instanceof com.google.protobuf.Timestamp);
+        assertThat(((com.google.protobuf.Timestamp) 
actualMessage).getSeconds(), is(epochSecond));
+        actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(now);
+        assertTrue(actualMessage instanceof com.google.protobuf.Timestamp);
+        assertThat(((com.google.protobuf.Timestamp) 
actualMessage).getSeconds(), is(epochSecond));
+        actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(new 
Date());
+        assertTrue(actualMessage instanceof com.google.protobuf.Timestamp);
+        assertThat(((com.google.protobuf.Timestamp) 
actualMessage).getSeconds(), is(epochSecond));
+        actualMessage = 
ColumnValueConvertUtil.convertToProtobufMessage(now.toInstant());
+        assertTrue(actualMessage instanceof com.google.protobuf.Timestamp);
+        assertThat(((com.google.protobuf.Timestamp) actualMessage).getNanos(), 
is(now.toInstant().getNano()));
+        actualMessage = 
ColumnValueConvertUtil.convertToProtobufMessage(now.toLocalDateTime().toLocalTime());
+        assertTrue(actualMessage instanceof LocalTimeValue);
+        assertThat(((LocalTimeValue) actualMessage).getValue(), 
is(now.toLocalDateTime().toLocalTime().toString()));
+        actualMessage = 
ColumnValueConvertUtil.convertToProtobufMessage("123456".getBytes());
+        assertTrue(actualMessage instanceof BytesValue);
+        assertThat(((BytesValue) actualMessage).getValue().toByteArray(), 
is("123456".getBytes()));
+    }
+}
diff --git 
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto 
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
index 3eac0066d39..00b3c76c452 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
@@ -66,34 +66,32 @@ message CreateSubscriptionRequest {
     string schema = 1;
     string name = 2;
   }
-  repeated TableName tableNames = 2;
-  string subscriptionName = 3;
-  SubscriptionMode subscriptionMode = 4;
-
+  repeated TableName table_names = 2;
+  string subscription_name = 3;
   enum SubscriptionMode {
     UNKNOWN = 0;
     INCREMENTAL = 1;
     FULL = 2;
   }
+  SubscriptionMode subscription_mode = 4;
+  bool incremental_global_orderly = 5;
 }
 
 message StartSubscriptionRequest {
   string database = 1;
-  string subscriptionName = 2;
+  string subscription_name = 2;
 }
 
 message StopSubscriptionRequest {
   string database = 1;
-  string subscriptionName = 2;
+  string subscription_name = 2;
 }
 
 message DropSubscriptionRequest {
   string database = 1;
-  string subscriptionName = 2;
+  string subscription_name = 2;
 }
 
 message AckRequest {
-  string database = 1;
-  string subscriptionName = 2;
   string ack_id = 3;
 }
diff --git 
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto 
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
index a583f3f664c..49c82f09433 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
@@ -47,7 +47,7 @@ message ServerGreetingResult {
 }
 
 message CreateSubscriptionResult {
-  string subscriptionName = 1;
+  string subscription_name = 1;
   bool existing = 2;
 }
 
@@ -63,10 +63,18 @@ message BigDecimalValue {
   string value = 1;
 }
 
+message LocalTimeValue {
+  string value = 1;
+}
+
 message ClobValue {
   string value = 1;
 }
 
+message BlobValue {
+  bytes value = 1;
+}
+
 message DataRecordResult {
   message Record {
     map<string, google.protobuf.Any> before = 1;
@@ -74,9 +82,9 @@ message DataRecordResult {
     message TableMetaData {
       string database = 1;
       optional string schema = 2;
-      string tableName = 3;
+      string table_name = 3;
     }
-    TableMetaData tableMetaData = 3;
+    TableMetaData table_meta_data = 3;
     int64 transaction_commit_millis = 4;
     enum DataChangeType {
       UNKNOWN = 0;
@@ -90,8 +98,9 @@ message DataRecordResult {
       ALTER_INDEX = 8;
       DROP_INDEX = 9;
     }
-    bool isDDL = 6;
-    optional string ddlSQL = 7;
+    DataChangeType data_change_type = 5;
+    bool is_ddl = 6;
+    optional string ddl_SQL = 7;
   }
   string ack_id = 1;
   repeated Record records = 2;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java
index ff8ad06e468..ba0f86cc7d0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 
 /**
@@ -32,7 +33,7 @@ public final class DataSourceImporterCreator implements 
ImporterCreator {
     @Override
     public Importer createImporter(final ImporterConfiguration importerConfig,
                                    final ImporterConnector importerConnector, 
final PipelineChannel channel,
-                                   final PipelineJobProgressListener 
jobProgressListener) {
+                                   final PipelineJobProgressListener 
jobProgressListener, final ImporterType importerType) {
         return new DataSourceImporter(importerConfig, importerConnector, 
channel, jobProgressListener);
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
index 02d8146909a..4d6a2d26969 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
@@ -19,8 +19,10 @@ package org.apache.shardingsphere.data.pipeline.core.record;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -80,4 +82,21 @@ public final class RecordUtil {
         }
         return result;
     }
+    
+    /**
+    * Get last normal record.
+    *
+    * @param records records
+    * @return last normal record.
+    */
+    public static Record getLastNormalRecord(final List<Record> records) {
+        for (int index = records.size() - 1; index >= 0; index--) {
+            Record record = records.get(index);
+            if (record.getPosition() instanceof PlaceholderPosition) {
+                continue;
+            }
+            return record;
+        }
+        return null;
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 5e08786080d..3d0e3df1518 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -36,6 +36,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
@@ -96,7 +97,8 @@ public final class IncrementalTask implements PipelineTask, 
AutoCloseable {
                                                  final 
PipelineJobProgressListener jobProgressListener) {
         Collection<Importer> result = new LinkedList<>();
         for (int i = 0; i < concurrency; i++) {
-            
result.add(TypedSPIRegistry.getRegisteredService(ImporterCreator.class, 
importerConnector.getType()).createImporter(importerConfig, importerConnector, 
channel, jobProgressListener));
+            
result.add(TypedSPIRegistry.getRegisteredService(ImporterCreator.class, 
importerConnector.getType()).createImporter(importerConfig, importerConnector, 
channel, jobProgressListener,
+                    ImporterType.INCREMENTAL));
         }
         return result;
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 6ee2ead917e..01fc4c91e04 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -26,7 +26,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -34,7 +33,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskPr
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
@@ -42,7 +43,6 @@ import 
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 import javax.sql.DataSource;
 import java.util.Collection;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -77,7 +77,8 @@ public final class InventoryTask implements PipelineTask, 
AutoCloseable {
         this.inventoryImporterExecuteEngine = inventoryImporterExecuteEngine;
         channel = createChannel(pipelineChannelCreator);
         dumper = new InventoryDumper(inventoryDumperConfig, channel, 
sourceDataSource, sourceMetaDataLoader);
-        importer = 
TypedSPIRegistry.getRegisteredService(ImporterCreator.class, 
importerConnector.getType()).createImporter(importerConfig, importerConnector, 
channel, jobProgressListener);
+        importer = 
TypedSPIRegistry.getRegisteredService(ImporterCreator.class, 
importerConnector.getType()).createImporter(importerConfig, importerConnector, 
channel, jobProgressListener,
+                ImporterType.INVENTORY);
         position = inventoryDumperConfig.getPosition();
     }
     
@@ -120,24 +121,13 @@ public final class InventoryTask implements PipelineTask, 
AutoCloseable {
     
     private PipelineChannel createChannel(final PipelineChannelCreator 
pipelineChannelCreator) {
         return pipelineChannelCreator.createPipelineChannel(1, records -> {
-            Record lastNormalRecord = getLastNormalRecord(records);
+            Record lastNormalRecord = RecordUtil.getLastNormalRecord(records);
             if (null != lastNormalRecord) {
                 position = lastNormalRecord.getPosition();
             }
         });
     }
     
-    private Record getLastNormalRecord(final List<Record> records) {
-        for (int index = records.size() - 1; index >= 0; index--) {
-            Record record = records.get(index);
-            if (record.getPosition() instanceof PlaceholderPosition) {
-                continue;
-            }
-            return record;
-        }
-        return null;
-    }
-    
     @Override
     public void stop() {
         dumper.stop();
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
index dec16de00cc..0381f69b1a3 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
@@ -34,7 +34,7 @@ public final class OpenGaussIncrementalDumperCreator 
implements IncrementalDumpe
     @Override
     public IncrementalDumper createIncrementalDumper(final DumperConfiguration 
dumperConfig, final IngestPosition<WALPosition> position,
                                                      final PipelineChannel 
channel, final PipelineTableMetaDataLoader metaDataLoader) {
-        return new OpenGaussWALDumper(dumperConfig, position, channel, 
metaDataLoader, false);
+        return new OpenGaussWALDumper(dumperConfig, position, channel, 
metaDataLoader);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index 4830429699d..cd144dec30d 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -67,7 +67,7 @@ public final class OpenGaussWALDumper extends 
AbstractLifecycleExecutor implemen
     private final List<AbstractRowEvent> rowEvents = new LinkedList<>();
     
     public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final 
IngestPosition<WALPosition> position,
-                              final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader, final boolean decodeWithTX) {
+                              final PipelineChannel channel, final 
PipelineTableMetaDataLoader metaDataLoader) {
         
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
                 () -> new 
UnsupportedSQLOperationException("PostgreSQLWALDumper only support 
PipelineDataSourceConfiguration"));
         this.dumperConfig = dumperConfig;
@@ -75,7 +75,7 @@ public final class OpenGaussWALDumper extends 
AbstractLifecycleExecutor implemen
         this.channel = channel;
         walEventConverter = new WALEventConverter(dumperConfig, 
metaDataLoader);
         logicalReplication = new OpenGaussLogicalReplication();
-        this.decodeWithTX = decodeWithTX;
+        this.decodeWithTX = dumperConfig.isDecodeWithTX();
     }
     
     @Override
diff --git 
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
 
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index 88f17388750..44b16467d64 100644
--- 
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ 
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -20,15 +20,20 @@ package org.apache.shardingsphere.proxy.backend.handler.cdc;
 import com.google.common.base.Strings;
 import io.netty.channel.Channel;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.CreateSubscriptionJobParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
 import 
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
 import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import 
org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
@@ -45,6 +50,8 @@ import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import org.apache.shardingsphere.sharding.rule.TableRule;
 
+import java.sql.SQLException;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -66,13 +73,13 @@ public final class CDCBackendHandler {
      * @return CDC response
      */
     public CDCResponse createSubscription(final CDCRequest request) {
-        CreateSubscriptionRequest subscriptionRequest = 
request.getCreateSubscription();
-        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(subscriptionRequest.getDatabase());
+        CreateSubscriptionRequest createSubscription = 
request.getCreateSubscription();
+        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(createSubscription.getDatabase());
         if (null == database) {
-            return CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists", 
subscriptionRequest.getDatabase()));
+            return CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists", 
createSubscription.getDatabase()));
         }
         List<String> tableNames = new LinkedList<>();
-        for (TableName each : subscriptionRequest.getTableNamesList()) {
+        for (TableName each : createSubscription.getTableNamesList()) {
             tableNames.add(Strings.isNullOrEmpty(each.getSchema()) ? 
each.getName() : String.join(".", each.getSchema(), each.getName()));
         }
         Optional<ShardingRule> rule = 
database.getRuleMetaData().getRules().stream().filter(each -> each instanceof 
ShardingRule).map(each -> (ShardingRule) each).findFirst();
@@ -83,14 +90,14 @@ public final class CDCBackendHandler {
         for (String each : tableNames) {
             actualDataNodesMap.put(each, getActualDataNodes(rule.get(), each));
         }
-        CreateSubscriptionJobParameter parameter = new 
CreateSubscriptionJobParameter(subscriptionRequest.getDatabase(), tableNames, 
subscriptionRequest.getSubscriptionName(),
-                subscriptionRequest.getSubscriptionMode().name(), 
actualDataNodesMap);
+        CreateSubscriptionJobParameter parameter = new 
CreateSubscriptionJobParameter(createSubscription.getDatabase(), tableNames, 
createSubscription.getSubscriptionName(),
+                createSubscription.getSubscriptionMode().name(), 
actualDataNodesMap, createSubscription.getIncrementalGlobalOrderly());
         if (jobAPI.createJob(parameter)) {
             return 
CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(CreateSubscriptionResult.newBuilder()
-                    
.setSubscriptionName(subscriptionRequest.getSubscriptionName()).setExisting(false).build()).build();
+                    
.setSubscriptionName(createSubscription.getSubscriptionName()).setExisting(false).build()).build();
         } else {
             return 
CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(CreateSubscriptionResult.newBuilder()
-                    
.setSubscriptionName(subscriptionRequest.getSubscriptionName()).setExisting(true).build()).build();
+                    
.setSubscriptionName(createSubscription.getSubscriptionName()).setExisting(true).build()).build();
         }
     }
     
@@ -111,17 +118,20 @@ public final class CDCBackendHandler {
     public CDCResponse startSubscription(final CDCRequest request, final 
Channel channel, final CDCConnectionContext connectionContext) {
         StartSubscriptionRequest startSubscriptionRequest = 
request.getStartSubscription();
         String jobId = jobAPI.marshalJobId(new 
CDCJobId(startSubscriptionRequest.getDatabase(), 
startSubscriptionRequest.getSubscriptionName()));
-        JobConfigurationPOJO jobConfigPOJO = 
PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
-        if (null == jobConfigPOJO) {
+        CDCJobConfiguration cdcJobConfig = (CDCJobConfiguration) 
jobAPI.getJobConfiguration(jobId);
+        if (null == cdcJobConfig) {
             return CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config 
doesn't exist",
                     startSubscriptionRequest.getSubscriptionName()));
         }
-        if (!jobConfigPOJO.isDisabled()) {
-            return CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.SERVER_ERROR, String.format("the %s is being used", 
startSubscriptionRequest.getSubscriptionName()));
-        }
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
+        // TODO, ensure that there is only one consumer at a time, job config 
disable may not be updated when the program is forced to close
         jobConfigPOJO.setDisabled(false);
         
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
-        CDCJob job = new CDCJob(new CDCImporterConnector(channel));
+        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabase());
+        Comparator<DataRecord> dataRecordComparator = 
cdcJobConfig.isDecodeWithTX()
+                ? 
DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
+                : null;
+        CDCJob job = new CDCJob(new CDCImporterConnector(channel, 
cdcJobConfig.getDatabase(), cdcJobConfig.getJobShardingCount(), 
cdcJobConfig.getTableNames(), dataRecordComparator));
         PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, 
jobConfigPOJO.toJobConfiguration());
         job.setJobBootstrap(oneOffJobBootstrap);
@@ -137,9 +147,32 @@ public final class CDCBackendHandler {
      * @param jobId job id
      */
     public void stopSubscription(final String jobId) {
+        if (Strings.isNullOrEmpty(jobId)) {
+            log.warn("job id is null or empty, ignored");
+            return;
+        }
         PipelineJobCenter.stop(jobId);
         JobConfigurationPOJO jobConfig = 
PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
         jobConfig.setDisabled(true);
         
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfig);
     }
+    
+    /**
+     * Drop subscription.
+     *
+     * @param jobId job id.
+     * @throws SQLException sql exception
+     */
+    public void dropSubscription(final String jobId) throws SQLException {
+        jobAPI.rollback(jobId);
+    }
+    
+    /**
+     * Process ack.
+     *
+     * @param ackRequest ack request
+     */
+    public void processAck(final AckRequest ackRequest) {
+        CDCAckHolder.getInstance().ack(ackRequest.getAckId());
+    }
 }
diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 451e6ac665e..965e0ea75cf 100644
--- 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
 import 
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
 import 
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
@@ -44,6 +45,7 @@ import 
org.apache.shardingsphere.proxy.backend.handler.cdc.CDCBackendHandler;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Objects;
 import java.util.Optional;
@@ -97,9 +99,10 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
                 stopStartSubscription(ctx, request, connectionContext);
                 break;
             case DROP_SUBSCRIPTION:
-                dropStartSubscription(ctx, request);
+                dropStartSubscription(ctx, request, connectionContext);
                 break;
             case ACK_REQUEST:
+                processAckRequest(ctx, request);
                 break;
             default:
                 log.warn("Cannot handle this type of request {}", request);
@@ -159,19 +162,38 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         if (startSubscriptionRequest.getDatabase().isEmpty() || 
startSubscriptionRequest.getSubscriptionName().isEmpty()) {
             
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal start subscription request 
parameter"))
                     .addListener(ChannelFutureListener.CLOSE);
+            return;
         }
         CDCResponse response = backendHandler.startSubscription(request, 
ctx.channel(), connectionContext);
         ctx.writeAndFlush(response);
     }
     
     private void stopStartSubscription(final ChannelHandlerContext ctx, final 
CDCRequest request, final CDCConnectionContext connectionContext) {
-        // TODO waiting for pipeline refactoring finished
+        backendHandler.stopSubscription(connectionContext.getJobId());
         connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
         
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
     }
     
-    private void dropStartSubscription(final ChannelHandlerContext ctx, final 
CDCRequest request) {
-        // TODO waiting for pipeline refactoring finished
-        
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+    private void dropStartSubscription(final ChannelHandlerContext ctx, final 
CDCRequest request, final CDCConnectionContext connectionContext) {
+        try {
+            backendHandler.dropSubscription(connectionContext.getJobId());
+            
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+        } catch (final SQLException ex) {
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.SERVER_ERROR, ex.getMessage()));
+        }
+    }
+    
+    private void processAckRequest(final ChannelHandlerContext ctx, final 
CDCRequest request) {
+        if (!request.hasAckRequest()) {
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss ack request body"))
+                    .addListener(ChannelFutureListener.CLOSE);
+            return;
+        }
+        AckRequest ackRequest = request.getAckRequest();
+        if (ackRequest.getAckId().isEmpty()) {
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal ack request parameter"));
+            return;
+        }
+        backendHandler.processAck(ackRequest);
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java
index a227fc74322..c672c8497fd 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 
 /**
@@ -32,7 +33,7 @@ public final class FixtureImporterCreator implements 
ImporterCreator {
     @Override
     public Importer createImporter(final ImporterConfiguration importerConfig,
                                    final ImporterConnector importerConnector, 
final PipelineChannel channel,
-                                   final PipelineJobProgressListener 
jobProgressListener) {
+                                   final PipelineJobProgressListener 
jobProgressListener, final ImporterType importerType) {
         return new FixtureImporter(importerConfig, importerConnector, channel, 
jobProgressListener);
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/record/RecordUtilTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/record/RecordUtilTest.java
index 2a83bddaaea..dac54263d90 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/record/RecordUtilTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/record/RecordUtilTest.java
@@ -17,18 +17,25 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.record;
 
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 
-import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 public final class RecordUtilTest {
@@ -67,4 +74,14 @@ public final class RecordUtilTest {
         result.addColumn(new Column("c3", "", true, false));
         return result;
     }
+    
+    @Test
+    public void assertGetLastNormalRecord() {
+        List<Record> actual = Arrays.asList(new DataRecord(new 
IntegerPrimaryKeyPosition(0, 1), 0), new PlaceholderRecord(new 
PlaceholderPosition()));
+        Record expected = RecordUtil.getLastNormalRecord(actual);
+        assertThat(expected, instanceOf(DataRecord.class));
+        actual = Arrays.asList(new DataRecord(new IntegerPrimaryKeyPosition(0, 
1), 0), new PlaceholderRecord(new PlaceholderPosition()), new 
FinishedRecord(new FinishedPosition()));
+        expected = RecordUtil.getLastNormalRecord(actual);
+        assertThat(expected, instanceOf(FinishedRecord.class));
+    }
 }

Reply via email to