This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c0c9717f617 Add rollback/commit streaming DistSQL implementation 
(#25289)
c0c9717f617 is described below

commit c0c9717f61751a2a49636f47bb8c39ba163b7370
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Apr 26 19:09:01 2023 +0800

    Add rollback/commit streaming DistSQL implementation (#25289)
    
    * Add drop streaming DistSQL and implementation
    
    * Add drop streaming test case
    
    * Extraction methods for reuse
    
    * Use awaitility fix occasional errors
    
    * Rename drop to rollback of streaming
    
    * Add commit DistSQL
    
    * Improve comment
    
    * Increment sleep seconds
    
    * Revert "Increment sleep seconds"
    
    This reverts commit ac168023a6961d12d3e37e1ce4ce43eb060bc5a9.
    
    * Fix commit statement not match
    
    * process commit streaming
---
 .../ral/pipeline/cdc/UpdatableCDCRALStatement.java | 28 +++--------
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      | 55 ++++++++++++++++++--
 .../data/pipeline/cdc/core/job/CDCJobId.java       |  5 +-
 .../pipeline/cdc/handler}/CDCBackendHandler.java   | 58 ++++++++++------------
 .../data/pipeline/cdc/core/job/CDCJobIdTest.java   |  3 +-
 .../src/main/proto/CDCRequestProtocol.proto        | 12 +++--
 .../pipeline/core/importer/DataSourceImporter.java |  1 -
 .../handler/update/CommitStreamingUpdater.java}    | 32 ++++++------
 .../handler/update/RollbackStreamingUpdater.java}  | 32 ++++++------
 ...ingsphere.distsql.handler.ral.update.RALUpdater |  2 +
 .../distsql/parser/autogen/CDCDistSQLStatement.g4  |  2 +
 .../parser/src/main/antlr4/imports/cdc/Keyword.g4  |  8 +++
 .../src/main/antlr4/imports/cdc/RALStatement.g4    |  8 +++
 .../parser/core/CDCDistSQLStatementVisitor.java    | 14 ++++++
 .../statement/CommitStreamingStatement.java}       | 20 +++++---
 .../statement/RollbackStreamingStatement.java}     | 20 +++++---
 .../frontend/netty/CDCChannelInboundHandler.java   | 35 +++++++++----
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  2 +
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  2 +
 .../UpdatablePipelineRALStatementAssert.java       | 12 ++++-
 .../cdc/CommitStreamingStatementAssert.java        | 46 +++++++++++++++++
 .../cdc/RollbackStreamingStatementAssert.java      | 46 +++++++++++++++++
 .../cases/parser/jaxb/RootSQLParserTestCases.java  |  8 +++
 .../ral/cdc/CommitStreamingStatementTestCase.java  | 27 +++++-----
 .../cdc/RollbackStreamingStatementTestCase.java    | 27 +++++-----
 test/it/parser/src/main/resources/case/ral/cdc.xml |  8 +++
 .../src/main/resources/sql/supported/ral/cdc.xml   |  2 +
 27 files changed, 367 insertions(+), 148 deletions(-)

diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4 
b/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/parser/statement/ral/pipeline/cdc/UpdatableCDCRALStatement.java
similarity index 70%
copy from 
kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
copy to 
distsql/statement/src/main/java/org/apache/shardingsphere/distsql/parser/statement/ral/pipeline/cdc/UpdatableCDCRALStatement.java
index 4f9a2b84b3a..120cc16e584 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
+++ 
b/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/parser/statement/ral/pipeline/cdc/UpdatableCDCRALStatement.java
@@ -15,26 +15,12 @@
  * limitations under the License.
  */
 
-lexer grammar Keyword;
+package org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.cdc;
 
-import Alphabet;
+import 
org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.UpdatablePipelineRALStatement;
 
-WS
-    : [ \t\r\n] + ->skip
-    ;
-
-SHOW
-    : S H O W
-    ;
-
-STREAMING
-    : S T R E A M I N G
-    ;
-
-LIST
-    : L I S T
-    ;
-
-STATUS
-    : S T A T U S
-    ;
+/**
+ * Updatable CDC RAL statement.
+ */
+public abstract class UpdatableCDCRALStatement extends 
UpdatablePipelineRALStatement implements CDCRALStatement {
+}
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 2d11a35d2c7..4089bdfbbf3 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -65,14 +65,17 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManag
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.sharding.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobDataNodeLineConvertUtils;
+import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import 
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -206,8 +209,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     }
     
     private String generateJobId(final PipelineContextKey contextKey, final 
YamlCDCJobConfiguration config) {
-        // TODO generate parameter add sink type
-        CDCJobId jobId = new CDCJobId(contextKey, 
config.getSchemaTableNames(), config.isFull());
+        CDCJobId jobId = new CDCJobId(contextKey, 
config.getSchemaTableNames(), config.isFull(), 
config.getSinkConfig().getSinkType());
         return marshalJobId(jobId);
     }
     
@@ -218,6 +220,40 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
     }
     
+    /**
+     * Start job.
+     *
+     * @param jobId             job id
+     * @param importerConnector importer connector
+     */
+    public void startJob(final String jobId, final ImporterConnector 
importerConnector) {
+        CDCJob job = new CDCJob(importerConnector);
+        PipelineJobCenter.addJob(jobId, job);
+        updateJobConfigurationDisabled(jobId, false);
+        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
 job, jobConfigPOJO.toJobConfiguration());
+        job.setJobBootstrap(oneOffJobBootstrap);
+        oneOffJobBootstrap.execute();
+    }
+    
+    /**
+     * Update job configuration disabled.
+     *
+     * @param jobId    job id
+     * @param disabled disabled
+     */
+    public void updateJobConfigurationDisabled(final String jobId, final 
boolean disabled) {
+        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        jobConfigPOJO.setDisabled(disabled);
+        if (disabled) {
+            jobConfigPOJO.getProps().setProperty("stop_time_millis", 
String.valueOf(System.currentTimeMillis()));
+        } else {
+            jobConfigPOJO.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
+            jobConfigPOJO.getProps().remove("stop_time_millis");
+        }
+        
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
+    }
+    
     @Override
     public CDCTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
         CDCJobConfiguration jobConfig = (CDCJobConfiguration) 
pipelineJobConfig;
@@ -298,13 +334,24 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     
     @Override
     public void rollback(final String jobId) throws SQLException {
-        stop(jobId);
+        CDCJobConfiguration jobConfig = getJobConfiguration(jobId);
+        if (CDCSinkType.SOCKET == jobConfig.getSinkConfig().getSinkType()) {
+            PipelineJobCenter.stop(jobId);
+        } else {
+            stop(jobId);
+        }
         dropJob(jobId);
     }
     
     @Override
     public void commit(final String jobId) {
-        throw new UnsupportedOperationException();
+        CDCJobConfiguration jobConfig = getJobConfiguration(jobId);
+        if (CDCSinkType.SOCKET == jobConfig.getSinkConfig().getSinkType()) {
+            PipelineJobCenter.stop(jobId);
+        } else {
+            stop(jobId);
+        }
+        dropJob(jobId);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
index ec62190399a..d05da53fe92 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
@@ -36,9 +36,12 @@ public final class CDCJobId extends AbstractPipelineJobId {
     
     private final boolean full;
     
-    public CDCJobId(final PipelineContextKey contextKey, final List<String> 
schemaTableNames, final boolean full) {
+    private final String sinkType;
+    
+    public CDCJobId(final PipelineContextKey contextKey, final List<String> 
schemaTableNames, final boolean full, final String sinkType) {
         super(new CDCJobType(), contextKey);
         this.schemaTableNames = schemaTableNames;
         this.full = full;
+        this.sinkType = sinkType;
     }
 }
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
similarity index 82%
rename from 
proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
rename to 
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 84747d34ac0..99766be2e85 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.proxy.backend.handler.cdc;
+package org.apache.shardingsphere.data.pipeline.cdc.handler;
 
 import com.google.common.base.Strings;
 import io.netty.channel.Channel;
@@ -28,7 +28,6 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import 
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
-import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
 import 
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
 import 
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
 import 
org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
@@ -41,14 +40,9 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCTableRuleUtils;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
 import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -75,6 +69,16 @@ public final class CDCBackendHandler {
     
     private final CDCJobAPI jobAPI = new CDCJobAPI();
     
+    /**
+     * Get database name by job id.
+     *
+     * @param jobId job id
+     * @return database
+     */
+    public String getDatabaseNameByJobId(final String jobId) {
+        return jobAPI.getJobConfiguration(jobId).getDatabaseName();
+    }
+    
     /**
      * Stream data.
      *
@@ -115,16 +119,6 @@ public final class CDCBackendHandler {
         return 
CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
     }
     
-    /**
-     * Get database name by job id.
-     *
-     * @param jobId job id
-     * @return database
-     */
-    public String getDatabaseNameByJobId(final String jobId) {
-        return jobAPI.getJobConfiguration(jobId).getDatabaseName();
-    }
-    
     /**
      * Start streaming.
      *
@@ -138,20 +132,11 @@ public final class CDCBackendHandler {
         if (PipelineJobCenter.isJobExisting(jobId)) {
             PipelineJobCenter.stop(jobId);
         }
-        JobConfigurationAPI jobConfigAPI = 
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId));
-        JobConfigurationPOJO jobConfigPOJO = 
jobConfigAPI.getJobConfiguration(jobId);
-        // TODO, ensure that there is only one consumer at a time, job config 
disable may not be updated when the program is forced to close
-        jobConfigPOJO.setDisabled(false);
-        jobConfigAPI.updateJobConfiguration(jobConfigPOJO);
         ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
         Comparator<DataRecord> dataRecordComparator = 
cdcJobConfig.isDecodeWithTX()
                 ? 
DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
                 : null;
-        CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel, 
database, cdcJobConfig.getJobShardingCount(), 
cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
-        PipelineJobCenter.addJob(jobId, job);
-        OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
 job, jobConfigPOJO.toJobConfiguration());
-        job.setJobBootstrap(oneOffJobBootstrap);
-        oneOffJobBootstrap.execute();
+        jobAPI.startJob(jobId, new SocketSinkImporterConnector(channel, 
database, cdcJobConfig.getJobShardingCount(), 
cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
         connectionContext.setJobId(jobId);
     }
     
@@ -166,22 +151,29 @@ public final class CDCBackendHandler {
             return;
         }
         PipelineJobCenter.stop(jobId);
-        JobConfigurationAPI jobConfigAPI = 
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId));
-        JobConfigurationPOJO jobConfig = 
jobConfigAPI.getJobConfiguration(jobId);
-        jobConfig.setDisabled(true);
-        jobConfigAPI.updateJobConfiguration(jobConfig);
+        jobAPI.updateJobConfigurationDisabled(jobId, true);
     }
     
     /**
-     * Drop streaming.
+     * Rollback streaming.
      *
      * @param jobId job id.
      * @throws SQLException sql exception
      */
-    public void dropStreaming(final String jobId) throws SQLException {
+    public void rollbackStreaming(final String jobId) throws SQLException {
         jobAPI.rollback(jobId);
     }
     
+    /**
+     * Commit streaming.
+     *
+     * @param jobId job id.
+     * @throws SQLException sql exception
+     */
+    public void commitStreaming(final String jobId) throws SQLException {
+        jobAPI.commit(jobId);
+    }
+    
     /**
      * Process ack.
      *
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
index 5255981c8b5..e16e17cdb64 100644
--- 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.cdc.core.job;
 
 import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
@@ -34,7 +35,7 @@ class CDCJobIdTest {
     @Test
     void parseJobType() {
         PipelineContextKey contextKey = 
PipelineContextKey.build("sharding_db", InstanceType.PROXY);
-        CDCJobId pipelineJobId = new CDCJobId(contextKey, 
Arrays.asList("test", "t_order"), false);
+        CDCJobId pipelineJobId = new CDCJobId(contextKey, 
Arrays.asList("test", "t_order"), false, CDCSinkType.SOCKET.name());
         String jobId = 
PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
         JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
         assertThat(actualJobType, instanceOf(CDCJobType.class));
diff --git 
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto 
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
index 8e322293cfa..1ec532fa64e 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
@@ -31,7 +31,8 @@ message CDCRequest {
     ACK_STREAMING = 3;
     STOP_STREAMING = 4;
     START_STREAMING = 5;
-    DROP_STREAMING = 6;
+    ROLLBACK_STREAMING = 6;
+    COMMIT_STREAMING = 7;
   }
   Type type = 3;
   oneof request_body {
@@ -40,7 +41,8 @@ message CDCRequest {
     AckStreamingRequestBody ack_streaming_request_body = 6;
     StopStreamingRequestBody stop_streaming_request_body = 7;
     StartStreamingRequestBody start_streaming_request_body = 8;
-    DropStreamingRequestBody drop_streaming_request_body = 9;
+    RollbackStreamingRequestBody rollback_streaming_request_body = 9;
+    CommitStreamingRequestBody commit_streaming_request_body = 10;
   }
 }
 
@@ -82,6 +84,10 @@ message StartStreamingRequestBody {
   string streaming_id = 1;
 }
 
-message DropStreamingRequestBody {
+message RollbackStreamingRequestBody {
+  string streaming_id = 1;
+}
+
+message CommitStreamingRequestBody {
   string streaming_id = 1;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 1ba4b49f1f5..7e94bfc009f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -265,7 +265,6 @@ public final class DataSourceImporter extends 
AbstractLifecycleExecutor implemen
             batchDeleteStatement = preparedStatement;
             preparedStatement.setQueryTimeout(30);
             for (DataRecord each : dataRecords) {
-                conditionColumns = RecordUtils.extractConditionColumns(each, 
importerConfig.getShardingColumns(each.getTableName()));
                 for (int i = 0; i < conditionColumns.size(); i++) {
                     Object oldValue = conditionColumns.get(i).getOldValue();
                     if (null == oldValue) {
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/CommitStreamingUpdater.java
similarity index 51%
copy from 
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
copy to 
kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/CommitStreamingUpdater.java
index ec62190399a..ec8e7eddcc9 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/CommitStreamingUpdater.java
@@ -15,30 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.core.job;
+package org.apache.shardingsphere.cdc.distsql.handler.update;
 
-import lombok.Getter;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
+import 
org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
+import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 
-import java.util.List;
+import java.sql.SQLException;
 
 /**
- * CDC job id.
+ * Commit streaming updater.
  */
-@Getter
-@ToString(callSuper = true)
-public final class CDCJobId extends AbstractPipelineJobId {
+public final class CommitStreamingUpdater implements 
RALUpdater<CommitStreamingStatement> {
     
-    private final List<String> schemaTableNames;
+    private final CDCJobAPI jobAPI = new CDCJobAPI();
     
-    private final boolean full;
+    @Override
+    public void executeUpdate(final String databaseName, final 
CommitStreamingStatement sqlStatement) throws SQLException {
+        jobAPI.commit(sqlStatement.getJobId());
+    }
     
-    public CDCJobId(final PipelineContextKey contextKey, final List<String> 
schemaTableNames, final boolean full) {
-        super(new CDCJobType(), contextKey);
-        this.schemaTableNames = schemaTableNames;
-        this.full = full;
+    @Override
+    public String getType() {
+        return CommitStreamingStatement.class.getName();
     }
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/RollbackStreamingUpdater.java
similarity index 51%
copy from 
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
copy to 
kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/RollbackStreamingUpdater.java
index ec62190399a..a6a2ef579d4 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/RollbackStreamingUpdater.java
@@ -15,30 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.core.job;
+package org.apache.shardingsphere.cdc.distsql.handler.update;
 
-import lombok.Getter;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
+import 
org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
+import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 
-import java.util.List;
+import java.sql.SQLException;
 
 /**
- * CDC job id.
+ * Rollback streaming updater.
  */
-@Getter
-@ToString(callSuper = true)
-public final class CDCJobId extends AbstractPipelineJobId {
+public final class RollbackStreamingUpdater implements 
RALUpdater<RollbackStreamingStatement> {
     
-    private final List<String> schemaTableNames;
+    private final CDCJobAPI jobAPI = new CDCJobAPI();
     
-    private final boolean full;
+    @Override
+    public void executeUpdate(final String databaseName, final 
RollbackStreamingStatement sqlStatement) throws SQLException {
+        jobAPI.rollback(sqlStatement.getJobId());
+    }
     
-    public CDCJobId(final PipelineContextKey contextKey, final List<String> 
schemaTableNames, final boolean full) {
-        super(new CDCJobType(), contextKey);
-        this.schemaTableNames = schemaTableNames;
-        this.full = full;
+    @Override
+    public String getType() {
+        return RollbackStreamingStatement.class.getName();
     }
 }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
 
b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
index 0ad61f97de2..63de7ed6cac 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
@@ -26,3 +26,5 @@ 
org.apache.shardingsphere.migration.distsql.handler.update.CheckMigrationJobUpda
 
org.apache.shardingsphere.migration.distsql.handler.update.StartMigrationCheckUpdater
 
org.apache.shardingsphere.migration.distsql.handler.update.StopMigrationCheckUpdater
 
org.apache.shardingsphere.migration.distsql.handler.update.DropMigrationCheckUpdater
+org.apache.shardingsphere.cdc.distsql.handler.update.RollbackStreamingUpdater
+org.apache.shardingsphere.cdc.distsql.handler.update.CommitStreamingUpdater
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
index a7738218450..4a459b2f38c 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++ 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
@@ -22,5 +22,7 @@ import Symbol, RALStatement;
 execute
     : (showStreamingList
     | showStreamingStatus
+    | rollbackStreaming
+    | commitStreaming
     ) SEMI?
     ;
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
index 4f9a2b84b3a..e5b28e62df8 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
@@ -38,3 +38,11 @@ LIST
 STATUS
     : S T A T U S
     ;
+
+ROLLBACK
+    : R O L L B A C K
+    ;
+
+COMMIT
+    : C O M M I T
+    ;
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
index 32d299de91f..5f591718437 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
+++ 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
@@ -27,6 +27,14 @@ showStreamingStatus
     : SHOW STREAMING STATUS jobId
     ;
 
+rollbackStreaming
+    : ROLLBACK STREAMING jobId
+    ;
+
+commitStreaming
+    : COMMIT STREAMING jobId
+    ;
+
 jobId
     : INT_ | IDENTIFIER_ | STRING_
     ;
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
 
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
index cf845b97fab..31f93c3f279 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
+++ 
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
@@ -18,9 +18,13 @@
 package org.apache.shardingsphere.cdc.distsql.parser.core;
 
 import org.antlr.v4.runtime.tree.ParseTree;
+import 
org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
+import 
org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.CommitStreamingContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.RollbackStreamingContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
 import org.apache.shardingsphere.sql.parser.api.ASTNode;
@@ -42,6 +46,16 @@ public final class CDCDistSQLStatementVisitor extends 
CDCDistSQLStatementBaseVis
         return new 
ShowStreamingStatusStatement(getIdentifierValue(ctx.jobId()));
     }
     
+    @Override
+    public ASTNode visitRollbackStreaming(final RollbackStreamingContext ctx) {
+        return new RollbackStreamingStatement(getIdentifierValue(ctx.jobId()));
+    }
+    
+    @Override
+    public ASTNode visitCommitStreaming(final CommitStreamingContext ctx) {
+        return new CommitStreamingStatement(getIdentifierValue(ctx.jobId()));
+    }
+    
     private String getIdentifierValue(final ParseTree ctx) {
         return null == ctx ? null : new 
IdentifierValue(ctx.getText()).getValue();
     }
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
 
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/CommitStreamingStatement.java
similarity index 66%
copy from 
kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
copy to 
kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/CommitStreamingStatement.java
index a7738218450..212668c3354 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++ 
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/CommitStreamingStatement.java
@@ -15,12 +15,18 @@
  * limitations under the License.
  */
 
-grammar CDCDistSQLStatement;
+package org.apache.shardingsphere.cdc.distsql.statement;
 
-import Symbol, RALStatement;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.cdc.UpdatableCDCRALStatement;
 
-execute
-    : (showStreamingList
-    | showStreamingStatus
-    ) SEMI?
-    ;
+/**
+ * Commit streaming statement.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CommitStreamingStatement extends UpdatableCDCRALStatement {
+    
+    private final String jobId;
+}
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
 
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/RollbackStreamingStatement.java
similarity index 66%
copy from 
kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
copy to 
kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/RollbackStreamingStatement.java
index a7738218450..2879748b978 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++ 
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/RollbackStreamingStatement.java
@@ -15,12 +15,18 @@
  * limitations under the License.
  */
 
-grammar CDCDistSQLStatement;
+package org.apache.shardingsphere.cdc.distsql.statement;
 
-import Symbol, RALStatement;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.cdc.UpdatableCDCRALStatement;
 
-execute
-    : (showStreamingList
-    | showStreamingStatus
-    ) SEMI?
-    ;
+/**
+ * Rollback streaming statement.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class RollbackStreamingStatement extends UpdatableCDCRALStatement 
{
+    
+    private final String jobId;
+}
diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 8e9abbbcfbb..54310daea95 100644
--- 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -34,8 +34,9 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
 import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CommitStreamingRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.RollbackStreamingRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
@@ -51,7 +52,7 @@ import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.ShardingSphereSQLException;
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.backend.handler.cdc.CDCBackendHandler;
+import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -128,8 +129,11 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
             case START_STREAMING:
                 processStartStreamingRequest(ctx, request, connectionContext);
                 break;
-            case DROP_STREAMING:
-                processDropStreamingRequest(ctx, request, connectionContext);
+            case ROLLBACK_STREAMING:
+                processRollbackStreamingRequest(ctx, request, 
connectionContext);
+                break;
+            case COMMIT_STREAMING:
+                processCommitStreamingRequest(ctx, request, connectionContext);
                 break;
             default:
                 log.warn("can't handle this type of request {}", request);
@@ -213,18 +217,27 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         String database = 
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
         checkPrivileges(request.getRequestId(), 
connectionContext.getCurrentUser().getGrantee(), database);
         backendHandler.stopStreaming(connectionContext.getJobId());
-        connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
         connectionContext.setJobId(null);
         
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
     }
     
-    private void processDropStreamingRequest(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
-        DropStreamingRequestBody requestBody = 
request.getDropStreamingRequestBody();
-        String database = 
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
-        checkPrivileges(request.getRequestId(), 
connectionContext.getCurrentUser().getGrantee(), database);
+    private void processRollbackStreamingRequest(final ChannelHandlerContext 
ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+        RollbackStreamingRequestBody requestBody = 
request.getRollbackStreamingRequestBody();
+        checkPrivileges(request.getRequestId(), 
connectionContext.getCurrentUser().getGrantee(), 
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
         try {
-            backendHandler.dropStreaming(connectionContext.getJobId());
-            connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
+            backendHandler.rollbackStreaming(connectionContext.getJobId());
+            connectionContext.setJobId(null);
+            
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+        } catch (final SQLException ex) {
+            throw new CDCExceptionWrapper(request.getRequestId(), new 
CDCServerException(ex.getMessage()));
+        }
+    }
+    
+    private void processCommitStreamingRequest(final ChannelHandlerContext 
ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+        CommitStreamingRequestBody requestBody = 
request.getCommitStreamingRequestBody();
+        checkPrivileges(request.getRequestId(), 
connectionContext.getCurrentUser().getGrantee(), 
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
+        try {
+            backendHandler.commitStreaming(connectionContext.getJobId());
             connectionContext.setJobId(null);
             
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
         } catch (final SQLException ex) {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 58361dd07c7..e93a4687ce9 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -149,6 +149,8 @@ class CDCE2EIT {
                     tableMetaData.getColumnNames(), primaryKeyMetaData, null, 
progressContext);
             DataConsistencyCheckResult checkResult = checker.check(new 
DataMatchDataConsistencyCalculateAlgorithm());
             assertTrue(checkResult.isMatched());
+            containerComposer.proxyExecuteWithLog(String.format("COMMIT 
STREAMING '%s'", jobId), 0);
+            assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING 
LIST").isEmpty());
         }
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index c2e631a16c0..ed078d92c19 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -103,6 +103,8 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
         startMigrationByJobId(containerComposer, jobId);
         // must refresh firstly, otherwise proxy can't get schema and table 
info
         containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog(
+                String.format("SELECT * FROM %s WHERE order_id = %s", 
String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), 
recordId)).isEmpty());
         containerComposer.assertProxyOrderRecordExist(String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
     }
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
index 97dadd8ae7d..7109e877017 100644
--- 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
@@ -19,6 +19,8 @@ package 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
+import 
org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
 import 
org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.UpdatablePipelineRALStatement;
 import 
org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
 import 
org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
@@ -31,6 +33,8 @@ import 
org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckS
 import 
org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
 import 
org.apache.shardingsphere.migration.distsql.statement.UnregisterMigrationSourceStorageUnitStatement;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc.CommitStreamingStatementAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc.RollbackStreamingStatementAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.CheckMigrationStatementAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.CommitMigrationStatementAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.MigrateTableStatementAssert;
@@ -42,6 +46,8 @@ import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.r
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.StopMigrationStatementAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.UnregisterMigrationSourceStorageUnitStatementAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.CommitStreamingStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.RollbackStreamingStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CommitMigrationStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.MigrateTableStatementTestCase;
@@ -61,7 +67,7 @@ public final class UpdatablePipelineRALStatementAssert {
     
     /**
      * Assert updatable pipeline RAL statement is correct with expected parser 
result.
-     * 
+     *
      * @param assertContext assert context
      * @param actual actual updatable pipeline RAL statement
      * @param expected expected updatable pipeline RAL statement test case
@@ -90,6 +96,10 @@ public final class UpdatablePipelineRALStatementAssert {
             StartMigrationCheckStatementAssert.assertIs(assertContext, 
(StartMigrationCheckStatement) actual, (StartMigrationCheckStatementTestCase) 
expected);
         } else if (actual instanceof StopMigrationCheckStatement) {
             StopMigrationCheckStatementAssert.assertIs(assertContext, 
(StopMigrationCheckStatement) actual, (StopMigrationCheckStatementTestCase) 
expected);
+        } else if (actual instanceof RollbackStreamingStatement) {
+            RollbackStreamingStatementAssert.assertIs(assertContext, 
(RollbackStreamingStatement) actual, (RollbackStreamingStatementTestCase) 
expected);
+        } else if (actual instanceof CommitStreamingStatement) {
+            CommitStreamingStatementAssert.assertIs(assertContext, 
(CommitStreamingStatement) actual, (CommitStreamingStatementTestCase) expected);
         }
     }
 }
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/CommitStreamingStatementAssert.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/CommitStreamingStatementAssert.java
new file mode 100644
index 00000000000..00d4ee6f594
--- /dev/null
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/CommitStreamingStatementAssert.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.JobIdAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.CommitStreamingStatementTestCase;
+
+/**
+ * Commit streaming statement assert.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CommitStreamingStatementAssert {
+    
+    /**
+     * Assert commit streaming statement is correct with expected parser 
result.
+     *
+     * @param assertContext assert context
+     * @param actual        actual
+     * @param expected      expected
+     */
+    public static void assertIs(final SQLCaseAssertContext assertContext, 
final CommitStreamingStatement actual, final CommitStreamingStatementTestCase 
expected) {
+        if (ExistingAssert.assertIs(assertContext, actual, expected)) {
+            JobIdAssert.assertJobId(assertContext, actual.getJobId(), 
expected.getJobId());
+        }
+    }
+}
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/RollbackStreamingStatementAssert.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/RollbackStreamingStatementAssert.java
new file mode 100644
index 00000000000..7d964908698
--- /dev/null
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/RollbackStreamingStatementAssert.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.JobIdAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.RollbackStreamingStatementTestCase;
+
+/**
+ * Rollback streaming statement assert.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class RollbackStreamingStatementAssert {
+    
+    /**
+     * Assert rollback streaming statement is correct with expected parser 
result.
+     *
+     * @param assertContext assert context
+     * @param actual        actual
+     * @param expected      expected
+     */
+    public static void assertIs(final SQLCaseAssertContext assertContext, 
final RollbackStreamingStatement actual, final 
RollbackStreamingStatementTestCase expected) {
+        if (ExistingAssert.assertIs(assertContext, actual, expected)) {
+            JobIdAssert.assertJobId(assertContext, actual.getJobId(), 
expected.getJobId());
+        }
+    }
+}
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
index ab25dfa0abc..0fc44915ed5 100644
--- 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
@@ -321,6 +321,8 @@ import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowTrafficRulesStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowTransactionRuleStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.UnlabelComputeNodeStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.CommitStreamingStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.RollbackStreamingStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingListStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
@@ -1017,6 +1019,12 @@ public final class RootSQLParserTestCases {
     @XmlElement(name = "show-streaming-status")
     private final List<ShowStreamingStatusStatementTestCase> 
showStreamingStatusTestCases = new LinkedList<>();
     
+    @XmlElement(name = "rollback-streaming")
+    private final List<RollbackStreamingStatementTestCase> 
rollbackStreamingTestCases = new LinkedList<>();
+    
+    @XmlElement(name = "commit-streaming")
+    private final List<CommitStreamingStatementTestCase> 
commitStreamingTestCases = new LinkedList<>();
+    
     @XmlElement(name = "preview-sql")
     private final List<PreviewStatementTestCase> previewTestCases = new 
LinkedList<>();
     
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/CommitStreamingStatementTestCase.java
similarity index 61%
copy from 
kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
copy to 
test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/CommitStreamingStatementTestCase.java
index 32d299de91f..695cfbd2198 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/CommitStreamingStatementTestCase.java
@@ -15,18 +15,21 @@
  * limitations under the License.
  */
 
-grammar RALStatement;
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc;
 
-import BaseRule;
+import lombok.Getter;
+import lombok.Setter;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
 
-showStreamingList
-    : SHOW STREAMING LIST
-    ;
+import javax.xml.bind.annotation.XmlElement;
 
-showStreamingStatus
-    : SHOW STREAMING STATUS jobId
-    ;
-
-jobId
-    : INT_ | IDENTIFIER_ | STRING_
-    ;
+/**
+ * Commit streaming statement test case.
+ */
+@Getter
+@Setter
+public final class CommitStreamingStatementTestCase extends SQLParserTestCase {
+    
+    @XmlElement(name = "job-id")
+    private String jobId;
+}
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/RollbackStreamingStatementTestCase.java
similarity index 61%
copy from 
kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
copy to 
test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/RollbackStreamingStatementTestCase.java
index 32d299de91f..b4c20895934 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/RollbackStreamingStatementTestCase.java
@@ -15,18 +15,21 @@
  * limitations under the License.
  */
 
-grammar RALStatement;
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc;
 
-import BaseRule;
+import lombok.Getter;
+import lombok.Setter;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
 
-showStreamingList
-    : SHOW STREAMING LIST
-    ;
+import javax.xml.bind.annotation.XmlElement;
 
-showStreamingStatus
-    : SHOW STREAMING STATUS jobId
-    ;
-
-jobId
-    : INT_ | IDENTIFIER_ | STRING_
-    ;
+/**
+ * Rollback streaming statement test case.
+ */
+@Getter
+@Setter
+public final class RollbackStreamingStatementTestCase extends 
SQLParserTestCase {
+    
+    @XmlElement(name = "job-id")
+    private String jobId;
+}
diff --git a/test/it/parser/src/main/resources/case/ral/cdc.xml 
b/test/it/parser/src/main/resources/case/ral/cdc.xml
index c0b8e7620f5..932f5818411 100644
--- a/test/it/parser/src/main/resources/case/ral/cdc.xml
+++ b/test/it/parser/src/main/resources/case/ral/cdc.xml
@@ -22,4 +22,12 @@
     <show-streaming-status sql-case-id="show-streaming-status">
         <job-id>123</job-id>
     </show-streaming-status>
+
+    <rollback-streaming sql-case-id="rollback-streaming">
+        <job-id>123</job-id>
+    </rollback-streaming>
+
+    <commit-streaming sql-case-id="commit-streaming">
+        <job-id>123</job-id>
+    </commit-streaming>
 </sql-parser-test-cases>
diff --git a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml 
b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
index 31af22ca298..2190b38e51c 100644
--- a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
+++ b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
@@ -19,4 +19,6 @@
 <sql-cases>
     <sql-case id="show-streaming-list" value="SHOW STREAMING LIST;" 
db-types="ShardingSphere"/>
     <sql-case id="show-streaming-status" value="SHOW STREAMING STATUS 123;" 
db-types="ShardingSphere"/>
+    <sql-case id="rollback-streaming" value="ROLLBACK STREAMING 123;" 
db-types="ShardingSphere"/>
+    <sql-case id="commit-streaming" value="COMMIT STREAMING 123;" 
db-types="ShardingSphere"/>
 </sql-cases>

Reply via email to