This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e6d1b38787 Improve CDC protocol (#24194)
1e6d1b38787 is described below

commit 1e6d1b38787750247fbd0ad26296f49fe94e3990
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Feb 16 22:08:06 2023 +0800

    Improve CDC protocol (#24194)
    
    * Rename subscription to streaming in pipeline CDC
    
    * Remove database and incremental_global_orderly at CDC request protocol
    
    * Rename subscription_name to streaming_id and remove subscription_mode
    
    * Refactor stream data, start job immediately
    
    * Fix codestyle
    
    * Remove source_schema
    
    * Improve
---
 .../data/pipeline/cdc/client/CDCClient.java        |  11 +-
 .../client/constant/ClientConnectionStatus.java    |   4 +-
 .../client/context/ClientConnectionContext.java    |   4 +-
 ...SubscriptionEvent.java => StreamDataEvent.java} |   4 +-
 ...nRequestHandler.java => CDCRequestHandler.java} |  62 +++++-----
 .../cdc/client/handler/LoginRequestHandler.java    |  19 +--
 .../client/parameter/StartCDCClientParameter.java  |  11 +-
 .../cdc/client/example/opengauss/Bootstrap.java    |   9 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  29 +++--
 ...nJobParameter.java => StreamDataParameter.java} |  10 +-
 .../cdc/config/job/CDCJobConfiguration.java        |   6 +-
 .../pipeline/cdc/constant/CDCConnectionStatus.java |   2 -
 .../pipeline/cdc/context/CDCConnectionContext.java |   2 +
 .../data/pipeline/cdc/core/job/CDCJobId.java       |  11 +-
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |   3 +-
 .../cdc/yaml/job/YamlCDCJobConfiguration.java      |   6 +-
 .../yaml/job/YamlCDCJobConfigurationSwapper.java   |   9 +-
 .../data/pipeline/cdc/core/job/CDCJobIdTest.java   |   4 +-
 .../job/YamlCDCJobConfigurationSwapperTest.java    |  11 +-
 .../src/main/proto/CDCRequestProtocol.proto        |  62 ++++------
 .../src/main/proto/CDCResponseProtocol.proto       |   8 +-
 .../backend/handler/cdc/CDCBackendHandler.java     |  98 +++++++--------
 .../backend/handler/cdc/CDCBackendHandlerTest.java |  23 ++--
 .../frontend/netty/CDCChannelInboundHandler.java   | 131 +++++++++++----------
 .../netty/CDCChannelInboundHandlerTest.java        |  13 +-
 25 files changed, 263 insertions(+), 289 deletions(-)

diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index fbb0a5c8f79..d8532932415 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -29,8 +29,8 @@ import io.netty.handler.codec.protobuf.ProtobufEncoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.CDCRequestHandler;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoginRequestHandler;
-import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.SubscriptionRequestHandler;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 
@@ -57,11 +57,8 @@ public final class CDCClient {
         if (null == parameter.getAddress() || 
parameter.getAddress().isEmpty()) {
             throw new IllegalArgumentException("The address parameter can't be 
null");
         }
-        if (null == parameter.getSubscriptionMode()) {
-            throw new IllegalArgumentException("The subscriptionMode parameter 
can't be null");
-        }
-        if (null == parameter.getSubscribeTables() || 
parameter.getSubscribeTables().isEmpty()) {
-            throw new IllegalArgumentException("The subscribeTables parameter 
can't be null");
+        if (null == parameter.getSchemaTables() || 
parameter.getSchemaTables().isEmpty()) {
+            throw new IllegalArgumentException("The schema tables parameter 
can't be null");
         }
     }
     
@@ -88,7 +85,7 @@ public final class CDCClient {
                         channel.pipeline().addLast(new 
ProtobufVarint32LengthFieldPrepender());
                         channel.pipeline().addLast(new ProtobufEncoder());
                         channel.pipeline().addLast(new 
LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
-                        channel.pipeline().addLast(new 
SubscriptionRequestHandler(parameter));
+                        channel.pipeline().addLast(new 
CDCRequestHandler(parameter));
                     }
                 });
         try {
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
index 55583b42b51..f644e5f59ff 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
@@ -28,7 +28,5 @@ public enum ClientConnectionStatus {
     
     LOGGING_IN,
     
-    CREATING_SUBSCRIPTION,
-    
-    SUBSCRIBING
+    STREAMING
 }
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
index 77eaaf19854..fcbf086979b 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
@@ -26,10 +26,12 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnect
  * Client connection context.
  */
 @Getter
+@Setter
 public final class ClientConnectionContext {
     
     public static final AttributeKey<ClientConnectionContext> CONTEXT_KEY = 
AttributeKey.valueOf("client.context");
     
-    @Setter
     private volatile ClientConnectionStatus status;
+    
+    private volatile String streamingId;
 }
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/CreateSubscriptionEvent.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/StreamDataEvent.java
similarity index 92%
rename from 
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/CreateSubscriptionEvent.java
rename to 
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/StreamDataEvent.java
index 59bb67e898c..ff0a4198da8 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/CreateSubscriptionEvent.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/StreamDataEvent.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.cdc.client.event;
 
 /**
- * Create subscription event.
+ * Stream data event.
  */
-public final class CreateSubscriptionEvent {
+public final class StreamDataEvent {
 }
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
similarity index 57%
rename from 
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
rename to 
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
index e7288950a8a..2f3367ec8ef 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
@@ -22,44 +22,46 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
-import 
org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.event.StreamDataEvent;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.importer.DataSourceImporter;
 import org.apache.shardingsphere.data.pipeline.cdc.client.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
 
 import java.util.List;
 
 /**
- * Subscription request handler.
+ * CDC request handler.
  */
 @Slf4j
-public final class SubscriptionRequestHandler extends 
ChannelInboundHandlerAdapter {
+public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {
     
     private final StartCDCClientParameter parameter;
     
     private final Importer importer;
     
-    public SubscriptionRequestHandler(final StartCDCClientParameter parameter) 
{
+    public CDCRequestHandler(final StartCDCClientParameter parameter) {
         this.parameter = parameter;
         importer = new DataSourceImporter(parameter.getDatabaseType(), 
parameter.getImportDataSourceParameter());
     }
     
     @Override
     public void userEventTriggered(final ChannelHandlerContext ctx, final 
Object evt) {
-        if (evt instanceof CreateSubscriptionEvent) {
-            CreateSubscriptionRequest createSubscriptionRequest = 
CreateSubscriptionRequest.newBuilder().setDatabase(parameter.getDatabase()).setSubscriptionMode(parameter.getSubscriptionMode())
-                    
.setSubscriptionName(parameter.getSubscriptionName()).addAllTableNames(parameter.getSubscribeTables()).setIncrementalGlobalOrderly(parameter.isIncrementalGlobalOrderly()).build();
-            CDCRequest request = 
CDCRequest.newBuilder().setCreateSubscription(createSubscriptionRequest).setRequestId(RequestIdUtil.generateRequestId()).build();
+        if (evt instanceof StreamDataEvent) {
+            StreamDataRequestBody streamDataRequestBody = 
StreamDataRequestBody.newBuilder().setDatabase(parameter.getDatabase()).setFull(parameter.isFull())
+                    
.addAllSourceSchemaTables(parameter.getSchemaTables()).build();
+            CDCRequest request = 
CDCRequest.newBuilder().setRequestId(RequestIdUtil.generateRequestId()).setType(Type.STREAM_DATA).setStreamDataRequestBody(streamDataRequestBody).build();
             ctx.writeAndFlush(request);
         }
     }
@@ -71,33 +73,24 @@ public final class SubscriptionRequestHandler extends 
ChannelInboundHandlerAdapt
             log.error("received error response {}", msg);
         }
         ClientConnectionContext connectionContext = 
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
-        if (connectionContext.getStatus() == 
ClientConnectionStatus.LOGGING_IN) {
-            if (!response.hasCreateSubscriptionResult()) {
-                log.error("not find the create subscription result");
-                return;
-            }
-            sendCreateSubscriptionRequest(ctx, response, connectionContext);
-        } else if (connectionContext.getStatus() == 
ClientConnectionStatus.CREATING_SUBSCRIPTION) {
-            startSubscription(response, connectionContext);
-        } else {
-            subscribeDataRecords(ctx, response.getDataRecordResult());
+        if (response.hasStreamDataResult()) {
+            StreamDataResult streamDataResult = response.getStreamDataResult();
+            
connectionContext.setStreamingId(streamDataResult.getStreamingId());
+            connectionContext.setStatus(ClientConnectionStatus.STREAMING);
+        } else if (response.hasDataRecordResult()) {
+            processDataRecords(ctx, response.getDataRecordResult());
         }
     }
     
-    private void sendCreateSubscriptionRequest(final ChannelHandlerContext 
ctx, final CDCResponse response, final ClientConnectionContext 
connectionContext) {
-        log.info("create subscription succeed, subscription name {}, exist 
{}", response.getCreateSubscriptionResult().getSubscriptionName(), 
response.getCreateSubscriptionResult().getExisting());
-        StartSubscriptionRequest startSubscriptionRequest = 
StartSubscriptionRequest.newBuilder().setDatabase(parameter.getDatabase()).setSubscriptionName(parameter.getSubscriptionName()).build();
-        Builder builder = 
CDCRequest.newBuilder().setRequestId(RequestIdUtil.generateRequestId()).setStartSubscription(startSubscriptionRequest);
+    // TODO not remove the method, may be used again in the future
+    private void sendStartStreamingDataRequest(final ChannelHandlerContext 
ctx, final String streamingId, final ClientConnectionContext connectionContext) 
{
+        StartStreamingRequestBody startStreamingRequest = 
StartStreamingRequestBody.newBuilder().setStreamingId(streamingId).build();
+        Builder builder = 
CDCRequest.newBuilder().setRequestId(RequestIdUtil.generateRequestId()).setType(Type.START_STREAMING).setStartStreamingRequestBody(startStreamingRequest);
         ctx.writeAndFlush(builder.build());
-        
connectionContext.setStatus(ClientConnectionStatus.CREATING_SUBSCRIPTION);
-    }
-    
-    private void startSubscription(final CDCResponse response, final 
ClientConnectionContext connectionContext) {
-        log.info("start subscription succeed, subscription name {}", 
response.getCreateSubscriptionResult().getSubscriptionName());
-        connectionContext.setStatus(ClientConnectionStatus.SUBSCRIBING);
+        connectionContext.setStatus(ClientConnectionStatus.STREAMING);
     }
     
-    private void subscribeDataRecords(final ChannelHandlerContext ctx, final 
DataRecordResult result) {
+    private void processDataRecords(final ChannelHandlerContext ctx, final 
DataRecordResult result) {
         List<Record> recordsList = result.getRecordsList();
         for (Record each : recordsList) {
             try {
@@ -108,8 +101,7 @@ public final class SubscriptionRequestHandler extends 
ChannelInboundHandlerAdapt
                 throw new RuntimeException(ex);
             }
         }
-        // TODO data needs to be processed, such as writing to a database
-        
ctx.channel().writeAndFlush(CDCRequest.newBuilder().setAckRequest(AckRequest.newBuilder().setAckId(result.getAckId()).build()).build());
+        
ctx.channel().writeAndFlush(CDCRequest.newBuilder().setType(Type.ACK_STREAMING).setAckStreamingRequestBody(AckStreamingRequestBody.newBuilder().setAckId(result.getAckId()).build()).build());
     }
     
     @Override
@@ -120,7 +112,7 @@ public final class SubscriptionRequestHandler extends 
ChannelInboundHandlerAdapt
     
     @Override
     public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) {
-        log.error("subscription handler error", cause);
+        log.error("handler data streaming error", cause);
         // TODO passing error messages to the caller
     }
 }
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
index d14df086502..4f06830a576 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
@@ -24,13 +24,13 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
-import 
org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.event.StreamDataEvent;
 import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.LoginType;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.LoginType;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
@@ -68,7 +68,7 @@ public final class LoginRequestHandler extends 
ChannelInboundHandlerAdapter {
                 setLoginRequest(ctx, response, connectionContext);
                 break;
             case NOT_LOGGED_IN:
-                sendSubscriptionEvent(ctx, response, connectionContext);
+                sendStreamDataEvent(ctx, response, connectionContext);
                 break;
             default:
                 ctx.fireChannelRead(msg);
@@ -79,18 +79,19 @@ public final class LoginRequestHandler extends 
ChannelInboundHandlerAdapter {
         ServerGreetingResult serverGreetingResult = 
response.getServerGreetingResult();
         log.info("Server greeting result, server version: {}, protocol 
version: {}", serverGreetingResult.getServerVersion(), 
serverGreetingResult.getProtocolVersion());
         String encryptPassword = 
Hashing.sha256().hashBytes(password.getBytes()).toString().toUpperCase();
-        LoginRequest loginRequest = 
LoginRequest.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(username).setPassword(encryptPassword).build()).build();
+        LoginRequestBody loginRequestBody = 
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(username).setPassword(encryptPassword).build())
+                .build();
         String loginRequestId = RequestIdUtil.generateRequestId();
-        CDCRequest data = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(loginRequestId).setLogin(loginRequest).build();
+        CDCRequest data = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(loginRequestId).setLoginRequestBody(loginRequestBody).build();
         ctx.writeAndFlush(data);
         connectionContext.setStatus(ClientConnectionStatus.NOT_LOGGED_IN);
     }
     
-    private void sendSubscriptionEvent(final ChannelHandlerContext ctx, final 
CDCResponse response, final ClientConnectionContext connectionContext) {
+    private void sendStreamDataEvent(final ChannelHandlerContext ctx, final 
CDCResponse response, final ClientConnectionContext connectionContext) {
         if (response.getStatus() == Status.SUCCEED) {
             log.info("login success, username {}", username);
             connectionContext.setStatus(ClientConnectionStatus.LOGGING_IN);
-            ctx.fireUserEventTriggered(new CreateSubscriptionEvent());
+            ctx.fireUserEventTriggered(new StreamDataEvent());
         } else {
             log.error("login failed, username {}", username);
         }
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
index a8c916f7b0c..4725a2dbdd3 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
@@ -20,8 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
 
 import java.util.List;
 
@@ -45,13 +44,9 @@ public final class StartCDCClientParameter {
     
     private String database;
     
-    private List<TableName> subscribeTables;
+    private List<SchemaTable> schemaTables;
     
-    private String subscriptionName;
-    
-    private SubscriptionMode subscriptionMode = SubscriptionMode.INCREMENTAL;
-    
-    private boolean incrementalGlobalOrderly;
+    private boolean full;
     
     private final ImportDataSourceParameter importDataSourceParameter;
 }
diff --git 
a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
 
b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
index b86b112057e..5bafbec1ecf 100644
--- 
a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
+++ 
b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
@@ -20,8 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.cdc.client.example.opengauss;
 import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.ImportDataSourceParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
 
 import java.util.Collections;
 
@@ -40,10 +39,8 @@ public final class Bootstrap {
         parameter.setUsername("root");
         parameter.setPassword("root");
         parameter.setDatabase("sharding_db");
-        parameter.setSubscriptionMode(SubscriptionMode.FULL);
-        parameter.setSubscriptionName("subscribe_sharding_db");
-        parameter.setIncrementalGlobalOrderly(true);
-        
parameter.setSubscribeTables(Collections.singletonList(TableName.newBuilder().setName("t_order").build()));
+        parameter.setFull(true);
+        
parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build()));
         parameter.setDatabaseType("openGauss");
         CDCClient cdcClient = new CDCClient(parameter);
         cdcClient.start();
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index bee9d5c08f6..bf7b9e97c9d 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -45,13 +45,12 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
-import 
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.CreateSubscriptionJobParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -107,14 +106,13 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
      * Create CDC job config.
      *
      * @param param create CDC job param
-     * @return true if job is not exist, otherwise false
+     * @return job id
      */
-    public boolean createJob(final CreateSubscriptionJobParameter param) {
+    public String createJob(final StreamDataParameter param) {
         YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
         yamlJobConfig.setDatabase(param.getDatabase());
-        yamlJobConfig.setTableNames(param.getSubscribeTableNames());
-        yamlJobConfig.setSubscriptionName(param.getSubscriptionName());
-        yamlJobConfig.setSubscriptionMode(param.getSubscriptionMode());
+        yamlJobConfig.setSchemaTableNames(param.getSchemaTableNames());
+        yamlJobConfig.setFull(param.isFull());
         yamlJobConfig.setDecodeWithTX(param.isDecodeWithTX());
         ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabase());
         
yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
@@ -130,16 +128,16 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         String jobConfigKey = 
PipelineMetaDataNode.getJobConfigPath(jobConfig.getJobId());
         if (repositoryAPI.isExisted(jobConfigKey)) {
             log.warn("cdc job already exists in registry center, ignore, 
jobConfigKey={}", jobConfigKey);
-            return false;
+            return jobConfig.getJobId();
         }
         
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
 getJobClassName());
         JobConfigurationPOJO jobConfigPOJO = 
convertJobConfiguration(jobConfig);
         jobConfigPOJO.setDisabled(true);
         repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
-        if 
(SubscriptionMode.INCREMENTAL.name().equals(param.getSubscriptionMode())) {
+        if (!param.isFull()) {
             initIncrementalPosition(jobConfig);
         }
-        return true;
+        return jobConfig.getJobId();
     }
     
     private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
@@ -150,7 +148,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
                 if (getJobItemProgress(jobId, i).isPresent()) {
                     continue;
                 }
-                TableNameSchemaNameMapping tableNameSchemaNameMapping = 
getTableNameSchemaNameMapping(jobConfig.getTableNames());
+                TableNameSchemaNameMapping tableNameSchemaNameMapping = 
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames());
                 DumperConfiguration dumperConfig = 
buildDumperConfiguration(jobConfig, i, tableNameSchemaNameMapping);
                 InventoryIncrementalJobItemProgress jobItemProgress = new 
InventoryIncrementalJobItemProgress();
                 
jobItemProgress.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
@@ -195,23 +193,24 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     }
     
     private String generateJobId(final YamlCDCJobConfiguration config) {
-        CDCJobId jobId = new CDCJobId(config.getDatabase(), 
config.getSubscriptionName());
+        // TODO generate parameter add sink type
+        CDCJobId jobId = new CDCJobId(config.getDatabase(), 
config.getSchemaTableNames(), config.isFull());
         return marshalJobId(jobId);
     }
     
     @Override
     protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
         CDCJobId jobId = (CDCJobId) pipelineJobId;
-        String text = Joiner.on('|').join(jobId.getDatabaseName(), 
jobId.getSubscriptionName());
+        String text = Joiner.on('|').join(jobId.getDatabaseName(), 
jobId.getTableNames(), jobId.isFull());
         return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
     }
     
     @Override
     public CDCTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
         CDCJobConfiguration jobConfig = (CDCJobConfiguration) 
pipelineJobConfig;
-        TableNameSchemaNameMapping tableNameSchemaNameMapping = 
getTableNameSchemaNameMapping(jobConfig.getTableNames());
+        TableNameSchemaNameMapping tableNameSchemaNameMapping = 
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames());
         DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig, 
jobShardingItem, tableNameSchemaNameMapping);
-        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
jobConfig.getTableNames(), tableNameSchemaNameMapping);
+        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
jobConfig.getSchemaTableNames(), tableNameSchemaNameMapping);
         CDCTaskConfiguration result = new CDCTaskConfiguration(dumperConfig, 
importerConfig);
         log.debug("buildTaskConfiguration, result={}", result);
         return result;
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
similarity index 84%
rename from 
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
rename to 
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
index 79c30206e54..a62bf5cda31 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
@@ -25,19 +25,17 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Create subscription job parameter.
+ * Stream data parameter.
  */
 @RequiredArgsConstructor
 @Getter
-public final class CreateSubscriptionJobParameter {
+public final class StreamDataParameter {
     
     private final String database;
     
-    private final List<String> subscribeTableNames;
+    private final List<String> schemaTableNames;
     
-    private final String subscriptionName;
-    
-    private final String subscriptionMode;
+    private final boolean full;
     
     private final Map<String, List<DataNode>> dataNodesMap;
     
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index d4c628ac874..7a04f3d7bca 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -36,11 +36,9 @@ public final class CDCJobConfiguration implements 
PipelineJobConfiguration {
     
     private final String database;
     
-    private final List<String> tableNames;
+    private final List<String> schemaTableNames;
     
-    private final String subscriptionName;
-    
-    private final String subscriptionMode;
+    private final boolean full;
     
     private final String sourceDatabaseType;
     
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java
index 09ce0e484db..4db5e6a54e6 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java
@@ -25,6 +25,4 @@ public enum CDCConnectionStatus {
     NOT_LOGGED_IN,
     
     LOGGED_IN,
-    
-    SUBSCRIBED
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
index 8abb4a70f73..51b689790bf 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
@@ -31,6 +31,8 @@ public final class CDCConnectionContext {
     
     private volatile CDCConnectionStatus status;
     
+    private volatile String database;
+    
     private volatile String jobId;
     
     private volatile ShardingSphereUser currentUser;
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
index e838129b8b0..0182c35a74b 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
@@ -22,6 +22,8 @@ import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
 
+import java.util.List;
+
 /**
  * CDC job id.
  */
@@ -33,11 +35,14 @@ public final class CDCJobId extends AbstractPipelineJobId {
     
     private final String databaseName;
     
-    private final String subscriptionName;
+    private final List<String> tableNames;
+    
+    private final boolean full;
     
-    public CDCJobId(final String databaseName, final String subscriptionName) {
+    public CDCJobId(final String databaseName, final List<String> tableNames, 
final boolean full) {
         super(new CDCJobType(), CURRENT_VERSION);
         this.databaseName = databaseName;
-        this.subscriptionName = subscriptionName;
+        this.tableNames = tableNames;
+        this.full = full;
     }
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index cbcee655918..8f789c6c2bf 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -27,7 +27,6 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
@@ -70,7 +69,7 @@ public final class CDCJobPreparer {
         }
         initIncrementalTasks(jobItemContext);
         CDCJobConfiguration jobConfig = jobItemContext.getJobConfig();
-        if 
(SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
+        if (jobConfig.isFull()) {
             initInventoryTasks(jobItemContext);
         }
         if (needUpdateJobStatus) {
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
index 9a02e7b853e..2d75a23931c 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
@@ -35,11 +35,9 @@ public final class YamlCDCJobConfiguration implements 
YamlPipelineJobConfigurati
     
     private String database;
     
-    private List<String> tableNames;
+    private List<String> schemaTableNames;
     
-    private String subscriptionName;
-    
-    private String subscriptionMode;
+    private boolean full;
     
     private String sourceDatabaseType;
     
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
index 14276bbed7b..8555a1e765c 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
@@ -40,9 +40,8 @@ public final class YamlCDCJobConfigurationSwapper implements 
YamlConfigurationSw
         YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
         result.setJobId(data.getJobId());
         result.setDatabase(data.getDatabase());
-        result.setTableNames(data.getTableNames());
-        result.setSubscriptionName(data.getSubscriptionName());
-        result.setSubscriptionMode(data.getSubscriptionMode());
+        result.setSchemaTableNames(data.getSchemaTableNames());
+        result.setFull(data.isFull());
         result.setSourceDatabaseType(data.getSourceDatabaseType());
         
result.setDataSourceConfiguration(dataSourceConfigSwapper.swapToYamlConfiguration(data.getDataSourceConfig()));
         result.setTablesFirstDataNodes(null == data.getTablesFirstDataNodes() 
? null : data.getTablesFirstDataNodes().marshal());
@@ -60,8 +59,8 @@ public final class YamlCDCJobConfigurationSwapper implements 
YamlConfigurationSw
                 ? Collections.emptyList()
                 : 
yamlConfig.getJobShardingDataNodes().stream().map(JobDataNodeLine::unmarshal).collect(Collectors.toList());
         JobDataNodeLine tablesFirstDataNodes = null == 
yamlConfig.getTablesFirstDataNodes() ? null : 
JobDataNodeLine.unmarshal(yamlConfig.getTablesFirstDataNodes());
-        return new CDCJobConfiguration(yamlConfig.getJobId(), 
yamlConfig.getDatabase(), yamlConfig.getTableNames(), 
yamlConfig.getSubscriptionName(), yamlConfig.getSubscriptionMode(),
-                yamlConfig.getSourceDatabaseType(), 
(ShardingSpherePipelineDataSourceConfiguration) 
dataSourceConfigSwapper.swapToObject(yamlConfig.getDataSourceConfiguration()), 
tablesFirstDataNodes,
+        return new CDCJobConfiguration(yamlConfig.getJobId(), 
yamlConfig.getDatabase(), yamlConfig.getSchemaTableNames(), 
yamlConfig.isFull(), yamlConfig.getSourceDatabaseType(),
+                (ShardingSpherePipelineDataSourceConfiguration) 
dataSourceConfigSwapper.swapToObject(yamlConfig.getDataSourceConfiguration()), 
tablesFirstDataNodes,
                 jobShardingDataNodes, yamlConfig.isDecodeWithTX(), 
yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
     }
     
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
index a06730170bd..408610f365a 100644
--- 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
@@ -22,6 +22,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.junit.Test;
 
+import java.util.Arrays;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -29,7 +31,7 @@ public final class CDCJobIdTest {
     
     @Test
     public void parseJobType() {
-        CDCJobId pipelineJobId = new CDCJobId("sharding_db", "test");
+        CDCJobId pipelineJobId = new CDCJobId("sharding_db", 
Arrays.asList("test", "t_order"), false);
         String jobId = 
PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
         JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
         assertThat(actualJobType, instanceOf(CDCJobType.class));
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
index 51ab34e39dc..c127a9e8798 100644
--- 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 public final class YamlCDCJobConfigurationSwapperTest {
     
@@ -32,14 +33,12 @@ public final class YamlCDCJobConfigurationSwapperTest {
         YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
         yamlJobConfig.setJobId("j51017f973ac82cb1edea4f5238a258c25e89");
         yamlJobConfig.setDatabase("test_db");
-        yamlJobConfig.setTableNames(Arrays.asList("t_order", "t_order_item"));
-        yamlJobConfig.setSubscriptionName("test_name");
-        yamlJobConfig.setSubscriptionMode("FULL");
+        yamlJobConfig.setSchemaTableNames(Arrays.asList("test.t_order", 
"t_order_item"));
+        yamlJobConfig.setFull(true);
         CDCJobConfiguration actual = new 
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
         assertThat(actual.getJobId(), 
is("j51017f973ac82cb1edea4f5238a258c25e89"));
         assertThat(actual.getDatabase(), is("test_db"));
-        assertThat(actual.getTableNames(), is(Arrays.asList("t_order", 
"t_order_item")));
-        assertThat(actual.getSubscriptionName(), is("test_name"));
-        assertThat(actual.getSubscriptionMode(), is("FULL"));
+        assertThat(actual.getSchemaTableNames(), 
is(Arrays.asList("test.t_order", "t_order_item")));
+        assertTrue(actual.isFull());
     }
 }
diff --git 
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto 
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
index 00b3c76c452..02a40d52f19 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
@@ -27,24 +27,24 @@ message CDCRequest {
   enum Type {
     UNKNOWN = 0;
     LOGIN = 1;
-    CREATE_SUBSCRIPTION = 2;
-    START_SUBSCRIPTION = 3;
-    ACK = 4;
-    STOP_SUBSCRIPTION = 5;
-    DROP_SUBSCRIPTION = 6;
+    STREAM_DATA = 2;
+    ACK_STREAMING = 3;
+    STOP_STREAMING = 4;
+    START_STREAMING = 5;
+    DROP_STREAMING = 6;
   }
   Type type = 3;
-  oneof request {
-    LoginRequest login = 4;
-    CreateSubscriptionRequest create_subscription = 5;
-    StartSubscriptionRequest start_subscription = 6;
-    AckRequest ack_request = 7;
-    StopSubscriptionRequest stop_subscription = 8;
-    DropSubscriptionRequest drop_subscription = 9;
+  oneof request_body {
+    LoginRequestBody login_request_body = 4;
+    StreamDataRequestBody stream_data_request_body = 5;
+    AckStreamingRequestBody ack_streaming_request_body = 6;
+    StopStreamingRequestBody stop_streaming_request_body = 7;
+    StartStreamingRequestBody start_streaming_request_body = 8;
+    DropStreamingRequestBody drop_streaming_request_body = 9;
   }
 }
 
-message LoginRequest {
+message LoginRequestBody {
   enum LoginType {
     UNKNOWN = 0;
     BASIC = 1;
@@ -60,38 +60,28 @@ message LoginRequest {
   }
 }
 
-message CreateSubscriptionRequest {
+message StreamDataRequestBody {
   string database = 1;
-  message TableName {
+  message SchemaTable {
     string schema = 1;
-    string name = 2;
+    string table = 2;
   }
-  repeated TableName table_names = 2;
-  string subscription_name = 3;
-  enum SubscriptionMode {
-    UNKNOWN = 0;
-    INCREMENTAL = 1;
-    FULL = 2;
-  }
-  SubscriptionMode subscription_mode = 4;
-  bool incremental_global_orderly = 5;
+  repeated SchemaTable source_schema_tables = 2;
+  bool full = 3;
 }
 
-message StartSubscriptionRequest {
-  string database = 1;
-  string subscription_name = 2;
+message AckStreamingRequestBody {
+  string ack_id = 1;
 }
 
-message StopSubscriptionRequest {
-  string database = 1;
-  string subscription_name = 2;
+message StopStreamingRequestBody {
+  string streaming_id = 1;
 }
 
-message DropSubscriptionRequest {
-  string database = 1;
-  string subscription_name = 2;
+message StartStreamingRequestBody {
+  string streaming_id = 1;
 }
 
-message AckRequest {
-  string ack_id = 3;
+message DropStreamingRequestBody {
+  string streaming_id = 1;
 }
diff --git 
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto 
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
index dd9a8158918..56b5969d9b8 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
@@ -33,10 +33,9 @@ message CDCResponse {
   Status status = 2;
   oneof response {
     ServerGreetingResult server_greeting_result = 3;
-    CreateSubscriptionResult create_subscription_result = 4;
+    StreamDataResult stream_data_result = 4;
     DataRecordResult data_record_result = 5;
   }
-
   optional string error_code = 14;
   optional string error_message = 15;
 }
@@ -46,9 +45,8 @@ message ServerGreetingResult {
   string protocol_version = 2;
 }
 
-message CreateSubscriptionResult {
-  string subscription_name = 1;
-  bool existing = 2;
+message StreamDataResult {
+  string streaming_id = 1;
 }
 
 message NullValue {
diff --git 
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
 
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index 1137e669816..8707da0276a 100644
--- 
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ 
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -22,29 +22,26 @@ import io.netty.channel.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.CreateSubscriptionJobParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
 import 
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
-import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
 import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
 import 
org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CreateSubscriptionResult;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
@@ -67,39 +64,38 @@ public final class CDCBackendHandler {
     private final CDCJobAPI jobAPI = new CDCJobAPI();
     
     /**
-     * Create subscription.
+     * Stream data.
      *
-     * @param request CDC request
+     * @param requestId request id
+     * @param requestBody stream data request body
+     * @param connectionContext connection context
+     * @param channel channel
      * @return CDC response
      */
-    public CDCResponse createSubscription(final CDCRequest request) {
-        CreateSubscriptionRequest createSubscription = 
request.getCreateSubscription();
-        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(createSubscription.getDatabase());
+    public CDCResponse streamData(final String requestId, final 
StreamDataRequestBody requestBody, final CDCConnectionContext 
connectionContext, final Channel channel) {
+        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
         if (null == database) {
-            return CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists", 
createSubscription.getDatabase()));
+            return CDCResponseGenerator.failed(requestId, 
CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists", 
requestBody.getDatabase()));
         }
-        List<String> tableNames = new LinkedList<>();
-        for (TableName each : createSubscription.getTableNamesList()) {
-            tableNames.add(Strings.isNullOrEmpty(each.getSchema()) ? 
each.getName() : String.join(".", each.getSchema(), each.getName()));
+        List<String> schemaTableNames = new LinkedList<>();
+        for (SchemaTable each : requestBody.getSourceSchemaTablesList()) {
+            schemaTableNames.add(Strings.isNullOrEmpty(each.getSchema()) ? 
each.getTable() : String.join(".", each.getSchema(), each.getTable()));
         }
         Optional<ShardingRule> shardingRule = 
database.getRuleMetaData().findSingleRule(ShardingRule.class);
         if (!shardingRule.isPresent()) {
-            return CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.SERVER_ERROR, "Not find sharding rule");
+            return CDCResponseGenerator.failed(requestId, 
CDCResponseErrorCode.SERVER_ERROR, "Not find sharding rule");
         }
         Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
         // TODO need support case-insensitive later
-        for (TableName each : createSubscription.getTableNamesList()) {
-            actualDataNodesMap.put(each.getName(), 
getActualDataNodes(shardingRule.get(), each.getName()));
-        }
-        CreateSubscriptionJobParameter parameter = new 
CreateSubscriptionJobParameter(createSubscription.getDatabase(), tableNames, 
createSubscription.getSubscriptionName(),
-                createSubscription.getSubscriptionMode().name(), 
actualDataNodesMap, createSubscription.getIncrementalGlobalOrderly());
-        if (jobAPI.createJob(parameter)) {
-            return 
CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(CreateSubscriptionResult.newBuilder()
-                    
.setSubscriptionName(createSubscription.getSubscriptionName()).setExisting(false).build()).build();
-        } else {
-            return 
CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(CreateSubscriptionResult.newBuilder()
-                    
.setSubscriptionName(createSubscription.getSubscriptionName()).setExisting(true).build()).build();
+        for (SchemaTable each : requestBody.getSourceSchemaTablesList()) {
+            actualDataNodesMap.put(each.getTable(), 
getActualDataNodes(shardingRule.get(), each.getTable()));
         }
+        boolean decodeWithTx = database.getProtocolType() instanceof 
OpenGaussDatabaseType;
+        StreamDataParameter parameter = new 
StreamDataParameter(requestBody.getDatabase(), schemaTableNames, 
requestBody.getFull(), actualDataNodesMap, decodeWithTx);
+        String jobId = jobAPI.createJob(parameter);
+        connectionContext.setJobId(jobId);
+        startStreaming(requestId, jobId, connectionContext, channel);
+        return 
CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
     }
     
     private List<DataNode> getActualDataNodes(final ShardingRule shardingRule, 
final String logicTableName) {
@@ -109,20 +105,29 @@ public final class CDCBackendHandler {
     }
     
     /**
-     * Start subscription.
+     * Get database by job id.
      *
-     * @param request request
+     * @param jobId job id
+     * @return database
+     */
+    public String getDatabaseByJobId(final String jobId) {
+        return ((CDCJobConfiguration) 
jobAPI.getJobConfiguration(jobId)).getDatabase();
+    }
+    
+    /**
+     * Start streaming.
+     *
+     * @param requestId request id
+     * @param jobId job id
      * @param channel channel
      * @param connectionContext connection context
      * @return CDC response
      */
-    public CDCResponse startSubscription(final CDCRequest request, final 
Channel channel, final CDCConnectionContext connectionContext) {
-        StartSubscriptionRequest startSubscriptionRequest = 
request.getStartSubscription();
-        String jobId = jobAPI.marshalJobId(new 
CDCJobId(startSubscriptionRequest.getDatabase(), 
startSubscriptionRequest.getSubscriptionName()));
+    // TODO not return CDCResponse
+    public CDCResponse startStreaming(final String requestId, final String 
jobId, final CDCConnectionContext connectionContext, final Channel channel) {
         CDCJobConfiguration cdcJobConfig = (CDCJobConfiguration) 
jobAPI.getJobConfiguration(jobId);
         if (null == cdcJobConfig) {
-            return CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config 
doesn't exist",
-                    startSubscriptionRequest.getSubscriptionName()));
+            return CDCResponseGenerator.failed(jobId, 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config 
doesn't exist", jobId));
         }
         JobConfigurationPOJO jobConfigPOJO = 
PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
         // TODO, ensure that there is only one consumer at a time, job config 
disable may not be updated when the program is forced to close
@@ -132,22 +137,21 @@ public final class CDCBackendHandler {
         Comparator<DataRecord> dataRecordComparator = 
cdcJobConfig.isDecodeWithTX()
                 ? 
DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
                 : null;
-        CDCJob job = new CDCJob(new CDCImporterConnector(channel, 
cdcJobConfig.getDatabase(), cdcJobConfig.getJobShardingCount(), 
cdcJobConfig.getTableNames(), dataRecordComparator));
+        CDCJob job = new CDCJob(new CDCImporterConnector(channel, 
cdcJobConfig.getDatabase(), cdcJobConfig.getJobShardingCount(), 
cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
         PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, 
jobConfigPOJO.toJobConfiguration());
         job.setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
-        connectionContext.setStatus(CDCConnectionStatus.SUBSCRIBED);
         connectionContext.setJobId(jobId);
-        return 
CDCResponseGenerator.succeedBuilder(request.getRequestId()).build();
+        return CDCResponseGenerator.succeedBuilder(requestId).build();
     }
     
     /**
-     * Stop subscription.
+     * Stop streaming.
      *
      * @param jobId job id
      */
-    public void stopSubscription(final String jobId) {
+    public void stopStreaming(final String jobId) {
         if (Strings.isNullOrEmpty(jobId)) {
             log.warn("job id is null or empty, ignored");
             return;
@@ -159,21 +163,21 @@ public final class CDCBackendHandler {
     }
     
     /**
-     * Drop subscription.
+     * Drop streaming.
      *
      * @param jobId job id.
      * @throws SQLException sql exception
      */
-    public void dropSubscription(final String jobId) throws SQLException {
+    public void dropStreaming(final String jobId) throws SQLException {
         jobAPI.rollback(jobId);
     }
     
     /**
      * Process ack.
      *
-     * @param ackRequest ack request
+     * @param requestBody request body
      */
-    public void processAck(final AckRequest ackRequest) {
-        CDCAckHolder.getInstance().ack(ackRequest.getAckId());
+    public void processAck(final AckStreamingRequestBody requestBody) {
+        CDCAckHolder.getInstance().ack(requestBody.getAckId());
     }
 }
diff --git 
a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
 
b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
index c650d2f6ced..6bd48cb8efa 100644
--- 
a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
+++ 
b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.shardingsphere.proxy.backend.handler.cdc;
 
+import io.netty.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
@@ -35,7 +38,6 @@ import 
org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.MockedStatic;
 
@@ -84,20 +86,9 @@ public final class CDCBackendHandlerTest {
     }
     
     @Test
-    public void assertCreateSubscriptionFailed() {
-        CDCRequest request = 
CDCRequest.newBuilder().setRequestId("1").setCreateSubscription(CreateSubscriptionRequest.newBuilder().setDatabase("none")).build();
-        CDCResponse actualResponse = handler.createSubscription(request);
+    public void assertStreamDataRequestFailed() {
+        CDCRequest request = 
CDCRequest.newBuilder().setRequestId("1").setType(Type.STREAM_DATA).setStreamDataRequestBody(StreamDataRequestBody.newBuilder().setDatabase("none")).build();
+        CDCResponse actualResponse = 
handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(), 
mock(CDCConnectionContext.class), mock(Channel.class));
         assertThat(actualResponse.getStatus(), is(Status.FAILED));
     }
-    
-    // TODO ignore for now, it need more mock, since SPI is removed. It's 
better to put it in E2E test
-    @Ignore
-    @Test
-    public void assertCreateSubscriptionSucceed() {
-        String requestId = "1";
-        CDCRequest request = 
CDCRequest.newBuilder().setRequestId(requestId).setCreateSubscription(CreateSubscriptionRequest.newBuilder().setDatabase("sharding_db")).build();
-        CDCResponse actualResponse = handler.createSubscription(request);
-        assertThat(actualResponse.getStatus(), is(Status.SUCCEED));
-        assertThat(actualResponse.getRequestId(), is(requestId));
-    }
 }
diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index e07a1a50a60..1c9a9859269 100644
--- 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -30,14 +30,13 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
 import 
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
 import 
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropSubscriptionRequest;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopSubscriptionRequest;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
 import 
org.apache.shardingsphere.distsql.handler.exception.rule.MissingRequiredRuleException;
@@ -80,7 +79,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
     public void channelInactive(final ChannelHandlerContext ctx) {
         CDCConnectionContext connectionContext = 
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
         if (null != connectionContext.getJobId()) {
-            backendHandler.stopSubscription(connectionContext.getJobId());
+            backendHandler.stopStreaming(connectionContext.getJobId());
         }
         ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
     }
@@ -112,33 +111,33 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
             processLogin(ctx, request, connectionContext);
             return;
         }
-        switch (request.getRequestCase()) {
-            case CREATE_SUBSCRIPTION:
-                processCreateSubscription(ctx, request, connectionContext);
+        switch (request.getType()) {
+            case STREAM_DATA:
+                processStreamDataRequest(ctx, request, connectionContext);
                 break;
-            case START_SUBSCRIPTION:
-                processStartSubscription(ctx, request, connectionContext);
+            case ACK_STREAMING:
+                processAckStreamingRequest(ctx, request);
                 break;
-            case STOP_SUBSCRIPTION:
-                processStopSubscription(ctx, request, connectionContext);
+            case STOP_STREAMING:
+                processStopStreamingRequest(ctx, request, connectionContext);
                 break;
-            case DROP_SUBSCRIPTION:
-                processDropSubscription(ctx, request, connectionContext);
+            case START_STREAMING:
+                processStartStreamingRequest(ctx, request, connectionContext);
                 break;
-            case ACK_REQUEST:
-                processAckRequest(ctx, request);
+            case DROP_STREAMING:
+                processDropStreamingRequest(ctx, request, connectionContext);
                 break;
             default:
-                log.warn("Cannot handle this type of request {}", request);
+                log.warn("can't handle this type of request {}", request);
         }
     }
     
     private void processLogin(final ChannelHandlerContext ctx, final 
CDCRequest request, final CDCConnectionContext connectionContext) {
-        if (!request.hasLogin() || !request.getLogin().hasBasicBody()) {
+        if (!request.hasLoginRequestBody() || 
!request.getLoginRequestBody().hasBasicBody()) {
             
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss login request 
body")).addListener(ChannelFutureListener.CLOSE);
             return;
         }
-        BasicBody body = request.getLogin().getBasicBody();
+        BasicBody body = request.getLoginRequestBody().getBasicBody();
         AuthorityRule authorityRule = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(AuthorityRule.class);
         Optional<ShardingSphereUser> user = authorityRule.findUser(new 
Grantee(body.getUsername(), getHostAddress(ctx)));
         if (user.isPresent() && 
Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString().toUpperCase(),
 body.getPassword())) {
@@ -164,69 +163,79 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         return socketAddress instanceof InetSocketAddress ? 
((InetSocketAddress) socketAddress).getAddress().getHostAddress() : 
socketAddress.toString();
     }
     
-    private void processCreateSubscription(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
-        if (!request.hasCreateSubscription()) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss create subscription request 
body"))
-                    .addListener(ChannelFutureListener.CLOSE);
+    private void processStreamDataRequest(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
+        if (!request.hasStreamDataRequestBody()) {
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss stream data request body"));
+            return;
+        }
+        StreamDataRequestBody requestBody = request.getStreamDataRequestBody();
+        if (requestBody.getDatabase().isEmpty()) {
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "database not allowed to be 
empty"));
             return;
         }
-        CreateSubscriptionRequest createSubscriptionRequest = 
request.getCreateSubscription();
-        if (createSubscriptionRequest.getTableNamesList().isEmpty() || 
createSubscriptionRequest.getDatabase().isEmpty() || 
createSubscriptionRequest.getSubscriptionName().isEmpty()
-                || createSubscriptionRequest.getSubscriptionMode() == 
SubscriptionMode.UNKNOWN) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal create subscription 
request parameter"));
+        // TODO need support the all tables at database or schema
+        if (requestBody.getSourceSchemaTablesList().isEmpty()) {
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal stream data request 
parameter"));
             return;
         }
-        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), 
createSubscriptionRequest.getDatabase());
-        CDCResponse response = backendHandler.createSubscription(request);
+        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), 
requestBody.getDatabase());
+        CDCResponse response = 
backendHandler.streamData(request.getRequestId(), requestBody, 
connectionContext, ctx.channel());
         ctx.writeAndFlush(response);
     }
     
-    private void processStartSubscription(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
-        if (!request.hasStartSubscription()) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss start subscription request 
body"))
+    private void processAckStreamingRequest(final ChannelHandlerContext ctx, 
final CDCRequest request) {
+        if (!request.hasAckStreamingRequestBody()) {
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss ack request 
body")).addListener(ChannelFutureListener.CLOSE);
+            return;
+        }
+        AckStreamingRequestBody requestBody = 
request.getAckStreamingRequestBody();
+        if (requestBody.getAckId().isEmpty()) {
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal ack request parameter"));
+            return;
+        }
+        backendHandler.processAck(requestBody);
+    }
+    
+    private void processStartStreamingRequest(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
+        if (!request.hasStartStreamingRequestBody()) {
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss start streaming request 
body"))
                     .addListener(ChannelFutureListener.CLOSE);
             return;
         }
-        StartSubscriptionRequest startSubscriptionRequest = 
request.getStartSubscription();
-        if (startSubscriptionRequest.getDatabase().isEmpty() || 
startSubscriptionRequest.getSubscriptionName().isEmpty()) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal start subscription request 
parameter"))
+        StartStreamingRequestBody requestBody = 
request.getStartStreamingRequestBody();
+        // TODO improve after cdc exception refactor
+        if (requestBody.getStreamingId().isEmpty()) {
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal start streaming request 
parameter"))
                     .addListener(ChannelFutureListener.CLOSE);
             return;
         }
-        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), 
startSubscriptionRequest.getDatabase());
-        CDCResponse response = backendHandler.startSubscription(request, 
ctx.channel(), connectionContext);
+        String database = 
backendHandler.getDatabaseByJobId(requestBody.getStreamingId());
+        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), 
database);
+        CDCResponse response = 
backendHandler.startStreaming(request.getRequestId(), 
requestBody.getStreamingId(), connectionContext, ctx.channel());
         ctx.writeAndFlush(response);
     }
     
-    private void processStopSubscription(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
-        StopSubscriptionRequest stopSubscriptionRequest = 
request.getStopSubscription();
-        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), 
stopSubscriptionRequest.getDatabase());
-        backendHandler.stopSubscription(connectionContext.getJobId());
+    private void processStopStreamingRequest(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
+        StopStreamingRequestBody requestBody = 
request.getStopStreamingRequestBody();
+        String database = 
backendHandler.getDatabaseByJobId(requestBody.getStreamingId());
+        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), 
database);
+        backendHandler.stopStreaming(connectionContext.getJobId());
         connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
+        connectionContext.setJobId(null);
         
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
     }
     
-    private void processDropSubscription(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
-        DropSubscriptionRequest dropSubscriptionRequest = 
request.getDropSubscription();
-        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), 
dropSubscriptionRequest.getDatabase());
+    private void processDropStreamingRequest(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
+        DropStreamingRequestBody requestBody = 
request.getDropStreamingRequestBody();
+        String database = 
backendHandler.getDatabaseByJobId(requestBody.getStreamingId());
+        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), 
database);
         try {
-            backendHandler.dropSubscription(connectionContext.getJobId());
+            backendHandler.dropStreaming(connectionContext.getJobId());
+            connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
+            connectionContext.setJobId(null);
             
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
         } catch (final SQLException ex) {
             
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.SERVER_ERROR, ex.getMessage()));
         }
     }
-    
-    private void processAckRequest(final ChannelHandlerContext ctx, final 
CDCRequest request) {
-        if (!request.hasAckRequest()) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss ack request 
body")).addListener(ChannelFutureListener.CLOSE);
-            return;
-        }
-        AckRequest ackRequest = request.getAckRequest();
-        if (ackRequest.getAckId().isEmpty()) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal ack request parameter"));
-            return;
-        }
-        backendHandler.processAck(ackRequest);
-    }
 }
diff --git 
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
 
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
index 3942246d752..603f4f0468c 100644
--- 
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
+++ 
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
@@ -24,8 +24,9 @@ import org.apache.shardingsphere.authority.rule.AuthorityRule;
 import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
 import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
@@ -82,7 +83,8 @@ public final class CDCChannelInboundHandlerTest {
     
     @Test
     public void assertLoginRequestFailed() {
-        CDCRequest actualRequest = 
CDCRequest.newBuilder().setLogin(LoginRequest.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root2").build()).build()).build();
+        CDCRequest actualRequest = 
CDCRequest.newBuilder().setType(Type.LOGIN).setLoginRequestBody(LoginRequestBody.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root2").build())
+                .build()).build();
         channel.writeInbound(actualRequest);
         CDCResponse expectedGreetingResult = channel.readOutbound();
         assertTrue(expectedGreetingResult.hasServerGreetingResult());
@@ -94,7 +96,7 @@ public final class CDCChannelInboundHandlerTest {
     
     @Test
     public void assertIllegalLoginRequest() {
-        CDCRequest actualRequest = 
CDCRequest.newBuilder().setVersion(1).setRequestId("test").build();
+        CDCRequest actualRequest = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId("test").build();
         channel.writeInbound(actualRequest);
         CDCResponse expectedGreetingResult = channel.readOutbound();
         assertTrue(expectedGreetingResult.hasServerGreetingResult());
@@ -107,7 +109,8 @@ public final class CDCChannelInboundHandlerTest {
     @Test
     public void assertLoginRequestSucceed() {
         String encryptPassword = 
Hashing.sha256().hashBytes("root".getBytes()).toString().toUpperCase();
-        Builder builder = 
CDCRequest.newBuilder().setLogin(LoginRequest.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root").setPassword(encryptPassword).build()).build());
+        Builder builder = 
CDCRequest.newBuilder().setType(Type.LOGIN).setLoginRequestBody(LoginRequestBody.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root")
+                .setPassword(encryptPassword).build()).build());
         CDCRequest actualRequest = builder.build();
         channel.writeInbound(actualRequest);
         CDCResponse expectedGreetingResult = channel.readOutbound();

Reply via email to