This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 440536943cc Add CDC job start subscription and add client connection 
status. (#23026)
440536943cc is described below

commit 440536943ccad254425c25aadb1b491ffd9d1670
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Dec 26 12:24:23 2022 +0800

    Add CDC job start subscription and add client connection status. (#23026)
    
    * Add CDC job start subscription and add client connection status.
    
    * Fix codestyle
    
    * Remove blank line
---
 .../spi/importer/ImporterCreatorFactory.java       |  6 +-
 .../client/constant/ClientConnectionStatus.java}   | 22 +++---
 .../client/context/ClientConnectionContext.java}   | 13 ++--
 .../cdc/client/handler/LoginRequestHandler.java    | 60 ++++++++--------
 .../client/handler/SubscriptionRequestHandler.java | 41 ++++++++---
 .../data/pipeline/cdc/api/CDCJobAPI.java           |  4 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPIImpl.java  | 22 +++++-
 .../pipeline/cdc/context/CDCConnectionContext.java |  3 +
 .../cdc/context/job/CDCJobItemContext.java         |  5 +-
 .../pipeline/cdc/core/importer/CDCImporter.java    | 23 +++++++
 .../cdc/core/importer/CDCImporterCreator.java      |  2 +-
 .../importer/connector/CDCImporterConnector.java   |  8 ++-
 .../data/pipeline/cdc/core/job/CDCJob.java         |  5 +-
 .../CDCJobConfigurationChangedProcessor.java       | 79 ----------------------
 ...andler.PipelineChangedJobConfigurationProcessor | 18 -----
 .../cdc/core/importer/CDCImporterCreatorTest.java  |  5 +-
 .../src/main/proto/CDCRequestProtocol.proto        | 14 ++--
 .../src/main/proto/CDCResponseProtocol.proto       |  4 +-
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  | 10 +--
 ...ineChangedJobConfigurationProcessorFactory.java |  6 +-
 .../impl/ChangedJobConfigurationDispatcher.java    |  5 +-
 .../backend/handler/cdc/CDCBackendHandler.java     | 72 +++++++++++++++++++-
 .../handler/cdc/fixture/FixtureCDCJobAPI.java      |  4 +-
 .../frontend/netty/CDCChannelInboundHandler.java   | 27 ++++++--
 ...hangedJobConfigurationProcessorFactoryTest.java | 13 +++-
 25 files changed, 277 insertions(+), 194 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
index 35e85f0d5f7..0445ec72db2 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
@@ -35,10 +35,10 @@ public final class ImporterCreatorFactory {
     /**
      * Get importer creator instance.
      *
-     * @param databaseType database type
+     * @param importType import type
      * @return importer creator
      */
-    public static ImporterCreator getInstance(final String databaseType) {
-        return TypedSPIRegistry.getRegisteredService(ImporterCreator.class, 
databaseType);
+    public static ImporterCreator getInstance(final String importType) {
+        return TypedSPIRegistry.getRegisteredService(ImporterCreator.class, 
importType);
     }
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
similarity index 71%
copy from 
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
copy to 
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
index 2b74ddfd816..55583b42b51 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
@@ -15,18 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.context;
-
-import lombok.Getter;
-import lombok.Setter;
-import 
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+package org.apache.shardingsphere.data.pipeline.cdc.client.constant;
 
 /**
- * CDC connection context.
+ * Client connection status.
  */
-@Getter
-public final class CDCConnectionContext {
+public enum ClientConnectionStatus {
+    
+    CONNECTED,
+    
+    NOT_LOGGED_IN,
+    
+    LOGGING_IN,
+    
+    CREATING_SUBSCRIPTION,
     
-    @Setter
-    private volatile CDCConnectionStatus status;
+    SUBSCRIBING
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
similarity index 66%
copy from 
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
copy to 
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
index 2b74ddfd816..77eaaf19854 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
@@ -15,18 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.context;
+package org.apache.shardingsphere.data.pipeline.cdc.client.context;
 
+import io.netty.util.AttributeKey;
 import lombok.Getter;
 import lombok.Setter;
-import 
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
 
 /**
- * CDC connection context.
+ * Client connection context.
  */
 @Getter
-public final class CDCConnectionContext {
+public final class ClientConnectionContext {
+    
+    public static final AttributeKey<ClientConnectionContext> CONTEXT_KEY = 
AttributeKey.valueOf("client.context");
     
     @Setter
-    private volatile CDCConnectionStatus status;
+    private volatile ClientConnectionStatus status;
 }
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
index 10974d5e6d9..2372f37fc86 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
@@ -20,9 +20,10 @@ package 
org.apache.shardingsphere.data.pipeline.cdc.client.handler;
 import com.google.common.hash.Hashing;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.util.AttributeKey;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
 import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
@@ -34,8 +35,6 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
 
-import java.util.Objects;
-
 /**
  * Login request handler.
  */
@@ -43,50 +42,57 @@ import java.util.Objects;
 @RequiredArgsConstructor
 public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
     
-    private static final AttributeKey<String> LOGIN_REQUEST_ID_KEY = 
AttributeKey.valueOf("login.request.id");
-    
     private final String username;
     
     private final String password;
     
-    private boolean loggedIn;
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) {
+        ClientConnectionContext context = new ClientConnectionContext();
+        context.setStatus(ClientConnectionStatus.CONNECTED);
+        
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent(context);
+    }
     
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) {
-        ctx.channel().attr(LOGIN_REQUEST_ID_KEY).set(null);
+        
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent(null);
     }
     
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
-        if (loggedIn) {
-            ctx.fireChannelRead(msg);
-            return;
-        }
         CDCResponse response = (CDCResponse) msg;
-        if (response.hasServerGreetingResult()) {
-            ServerGreetingResult serverGreetingResult = 
response.getServerGreetingResult();
-            log.info("Server greeting result, server version: {}, protocol 
version: {}", serverGreetingResult.getServerVersion(), 
serverGreetingResult.getProtocolVersion());
-            sendLoginRequest(ctx);
-            return;
-        }
-        if (Status.FAILED == response.getStatus()) {
-            log.error("login failed, {}", msg);
-            return;
-        }
-        if (Status.SUCCEED == response.getStatus() && 
Objects.equals(ctx.channel().attr(LOGIN_REQUEST_ID_KEY).get(), 
response.getRequestId())) {
-            log.info("login success, username {}", username);
-            loggedIn = true;
-            ctx.fireUserEventTriggered(new CreateSubscriptionEvent());
+        ClientConnectionContext connectionContext = 
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
+        switch (connectionContext.getStatus()) {
+            case CONNECTED:
+                setLogindRequest(ctx, response, connectionContext);
+                break;
+            case NOT_LOGGED_IN:
+                sendSubscriptionEvent(ctx, response, connectionContext);
+                break;
+            default:
+                ctx.fireChannelRead(msg);
         }
     }
     
-    private void sendLoginRequest(final ChannelHandlerContext ctx) {
+    private void setLogindRequest(final ChannelHandlerContext ctx, final 
CDCResponse response, final ClientConnectionContext connectionContext) {
+        ServerGreetingResult serverGreetingResult = 
response.getServerGreetingResult();
+        log.info("Server greeting result, server version: {}, protocol 
version: {}", serverGreetingResult.getServerVersion(), 
serverGreetingResult.getProtocolVersion());
         String encryptPassword = 
Hashing.sha256().hashBytes(password.getBytes()).toString().toUpperCase();
         LoginRequest loginRequest = 
LoginRequest.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(username).setPassword(encryptPassword).build()).build();
         String loginRequestId = RequestIdUtil.generateRequestId();
-        ctx.channel().attr(LOGIN_REQUEST_ID_KEY).setIfAbsent(loginRequestId);
         CDCRequest data = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(loginRequestId).setLogin(loginRequest).build();
         ctx.writeAndFlush(data);
+        connectionContext.setStatus(ClientConnectionStatus.NOT_LOGGED_IN);
+    }
+    
+    private void sendSubscriptionEvent(final ChannelHandlerContext ctx, final 
CDCResponse response, final ClientConnectionContext connectionContext) {
+        if (response.getStatus() == Status.SUCCEED) {
+            log.info("login success, username {}", username);
+            connectionContext.setStatus(ClientConnectionStatus.LOGGING_IN);
+            ctx.fireUserEventTriggered(new CreateSubscriptionEvent());
+        } else {
+            log.error("login failed, username {}", username);
+        }
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
index 569d9d4af14..e1aa325a31d 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
@@ -21,6 +21,8 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
 import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
@@ -62,21 +64,38 @@ public final class SubscriptionRequestHandler extends 
ChannelInboundHandlerAdapt
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
         CDCResponse response = (CDCResponse) msg;
-        if (Status.SUCCEED == response.getStatus()) {
-            processSucceed(ctx, response);
+        if (response.getStatus() == Status.FAILED) {
+            log.error("received error response {}", msg);
+        }
+        ClientConnectionContext connectionContext = 
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
+        if (connectionContext.getStatus() == 
ClientConnectionStatus.LOGGING_IN) {
+            if (!response.hasCreateSubscriptionResult()) {
+                log.error("not find the create subscription result");
+                return;
+            }
+            sendCreateSubscriptionRequest(ctx, response, connectionContext);
+        } else if (connectionContext.getStatus() == 
ClientConnectionStatus.CREATING_SUBSCRIPTION) {
+            startSubscription(response, connectionContext);
         } else {
-            log.error("subscription response error {}", msg);
+            subscribeDataRecords(ctx);
         }
     }
     
-    private void processSucceed(final ChannelHandlerContext ctx, final 
CDCResponse response) {
-        if (response.hasCreateSubscriptionResult()) {
-            log.info("create subscription succeed, subscription name {}", 
response.getCreateSubscriptionResult().getSubscriptionName());
-            StartSubscriptionRequest startSubscriptionRequest = 
StartSubscriptionRequest.newBuilder().setSubscriptionName(subscriptionName).build();
-            Builder builder = 
CDCRequest.newBuilder().setRequestId(RequestIdUtil.generateRequestId()).setStartSubscription(startSubscriptionRequest);
-            ctx.writeAndFlush(builder.build());
-        }
-        // TODO waiting for pipeline refactoring finished
+    private void sendCreateSubscriptionRequest(final ChannelHandlerContext 
ctx, final CDCResponse response, final ClientConnectionContext 
connectionContext) {
+        log.info("create subscription succeed, subscription name {}, exist 
{}", response.getCreateSubscriptionResult().getSubscriptionName(), 
response.getCreateSubscriptionResult().getExisting());
+        StartSubscriptionRequest startSubscriptionRequest = 
StartSubscriptionRequest.newBuilder().setDatabase(database).setSubscriptionName(subscriptionName).build();
+        Builder builder = 
CDCRequest.newBuilder().setRequestId(RequestIdUtil.generateRequestId()).setStartSubscription(startSubscriptionRequest);
+        ctx.writeAndFlush(builder.build());
+        
connectionContext.setStatus(ClientConnectionStatus.CREATING_SUBSCRIPTION);
+    }
+    
+    private void startSubscription(final CDCResponse response, final 
ClientConnectionContext connectionContext) {
+        log.info("start subscription succeed, subscription name {}", 
response.getCreateSubscriptionResult().getSubscriptionName());
+        connectionContext.setStatus(ClientConnectionStatus.SUBSCRIBING);
+    }
+    
+    private void subscribeDataRecords(final ChannelHandlerContext ctx) {
+        // TODO to be implemented
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index a7e5fc7c5b6..741dda9187c 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -39,10 +39,10 @@ public interface CDCJobAPI extends 
InventoryIncrementalJobAPI, RequiredSPI {
     CDCProcessContext buildPipelineProcessContext(PipelineJobConfiguration 
pipelineJobConfig);
     
     /**
-     * Create CDC job config and start.
+     * Create CDC job config.
      *
      * @param event create CDC job event
      * @return job id
      */
-    String createJobAndStart(CreateSubscriptionJobParameter event);
+    boolean createJob(CreateSubscriptionJobParameter event);
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
index a3c182dfdca..7b93604f380 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
@@ -52,10 +52,14 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobDataNodeLineConvertUtil;
 import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -63,6 +67,8 @@ import 
org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtra
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
@@ -94,7 +100,7 @@ public final class CDCJobAPIImpl extends 
AbstractInventoryIncrementalJobAPIImpl
     private final YamlPipelineDataSourceConfigurationSwapper 
pipelineDataSourceConfigSwapper = new 
YamlPipelineDataSourceConfigurationSwapper();
     
     @Override
-    public String createJobAndStart(final CreateSubscriptionJobParameter 
event) {
+    public boolean createJob(final CreateSubscriptionJobParameter event) {
         YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
         yamlJobConfig.setDatabase(event.getDatabase());
         yamlJobConfig.setTableNames(event.getSubscribeTableNames());
@@ -109,8 +115,18 @@ public final class CDCJobAPIImpl extends 
AbstractInventoryIncrementalJobAPIImpl
         yamlJobConfig.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
         extendYamlJobConfiguration(yamlJobConfig);
         CDCJobConfiguration jobConfig = new 
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
-        start(jobConfig);
-        return jobConfig.getJobId();
+        ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
+        GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
+        String jobConfigKey = 
PipelineMetaDataNode.getJobConfigPath(jobConfig.getJobId());
+        if (repositoryAPI.isExisted(jobConfigKey)) {
+            log.warn("cdc job already exists in registry center, ignore, 
jobConfigKey={}", jobConfigKey);
+            return false;
+        }
+        
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
 getJobClassName());
+        JobConfigurationPOJO jobConfigurationPOJO = 
convertJobConfiguration(jobConfig);
+        jobConfigurationPOJO.setDisabled(true);
+        repositoryAPI.persist(jobConfigKey, 
YamlEngine.marshal(jobConfigurationPOJO));
+        return true;
     }
     
     private ShardingSpherePipelineDataSourceConfiguration 
getDataSourceConfiguration(final ShardingSphereDatabase database) {
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
index 2b74ddfd816..e73199c02e1 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
@@ -29,4 +29,7 @@ public final class CDCConnectionContext {
     
     @Setter
     private volatile CDCConnectionStatus status;
+    
+    @Setter
+    private volatile String jobId;
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
index ee6596dec22..c2702337ec6 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
@@ -32,7 +32,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -67,6 +66,8 @@ public final class CDCJobItemContext implements 
InventoryIncrementalJobItemConte
     
     private final PipelineDataSourceManager dataSourceManager;
     
+    private final ImporterConnector importerConnector;
+    
     private final Collection<InventoryTask> inventoryTasks = new 
LinkedList<>();
     
     private final Collection<IncrementalTask> incrementalTasks = new 
LinkedList<>();
@@ -120,7 +121,7 @@ public final class CDCJobItemContext implements 
InventoryIncrementalJobItemConte
     
     @Override
     public ImporterConnector getImporterConnector() {
-        return new CDCImporterConnector();
+        return importerConnector;
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 76a9a59df09..e9fd10e3ac4 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -17,14 +17,37 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
 
+import lombok.AccessLevel;
+import lombok.Getter;
+import 
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 
 /**
  * CDC importer.
  */
 public final class CDCImporter extends AbstractLifecycleExecutor implements 
Importer {
     
+    @Getter(AccessLevel.PROTECTED)
+    private final ImporterConfiguration importerConfig;
+    
+    private final PipelineChannel channel;
+    
+    private final PipelineJobProgressListener jobProgressListener;
+    
+    private final JobRateLimitAlgorithm rateLimitAlgorithm;
+    
+    public CDCImporter(final ImporterConfiguration importerConfig, final 
ImporterConnector importerConnector, final PipelineChannel channel, final 
PipelineJobProgressListener jobProgressListener) {
+        this.importerConfig = importerConfig;
+        rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
+        this.channel = channel;
+        this.jobProgressListener = jobProgressListener;
+    }
+    
     @Override
     protected void runBlocking() {
         // TODO to be implemented
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
index f329a3d1623..d27023370aa 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
@@ -32,7 +32,7 @@ public final class CDCImporterCreator implements 
ImporterCreator {
     @Override
     public Importer createImporter(final ImporterConfiguration importerConfig, 
final ImporterConnector importerConnector, final PipelineChannel channel,
                                    final PipelineJobProgressListener 
jobProgressListener) {
-        return new CDCImporter();
+        return new CDCImporter(importerConfig, importerConnector, channel, 
jobProgressListener);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
index c268dbbcd98..ba7a7e40b2c 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
@@ -17,17 +17,21 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector;
 
+import io.netty.channel.Channel;
+import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 
 /**
  * CDC importer connector.
  */
+@RequiredArgsConstructor
 public final class CDCImporterConnector implements ImporterConnector {
     
+    private final Channel channel;
+    
     @Override
     public Object getConnector() {
-        // TODO to be implemented
-        return null;
+        return channel;
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 4f9619f01e5..8dbe1a30974 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -35,6 +35,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
+import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 
 /**
@@ -44,6 +45,8 @@ import 
org.apache.shardingsphere.elasticjob.api.ShardingContext;
 @Slf4j
 public final class CDCJob extends AbstractSimplePipelineJob {
     
+    private final ImporterConnector importerConnector;
+    
     private final CDCJobAPI jobAPI = CDCJobAPIFactory.getInstance();
     
     private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
@@ -62,7 +65,7 @@ public final class CDCJob extends AbstractSimplePipelineJob {
         InventoryIncrementalJobItemProgress initProgress = 
jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
         CDCProcessContext jobProcessContext = 
jobAPI.buildPipelineProcessContext(jobConfig);
         CDCTaskConfiguration taskConfig = 
jobAPI.buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
-        return new CDCJobItemContext(jobConfig, shardingItem, initProgress, 
jobProcessContext, taskConfig, dataSourceManager);
+        return new CDCJobItemContext(jobConfig, shardingItem, initProgress, 
jobProcessContext, taskConfig, dataSourceManager, importerConnector);
     }
     
     protected PipelineTasksRunner buildPipelineTasksRunner(final 
PipelineJobItemContext pipelineJobItemContext) {
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/metadata/processor/CDCJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/metadata/processor/CDCJobConfigurationChangedProcessor.java
deleted file mode 100644
index f6607ec0c01..00000000000
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/metadata/processor/CDCJobConfigurationChangedProcessor.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.metadata.processor;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
-import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
-
-import java.util.concurrent.CompletableFuture;
-
-/**
- * CDC job configuration changed processor.
- */
-@Slf4j
-public final class CDCJobConfigurationChangedProcessor implements 
PipelineChangedJobConfigurationProcessor {
-    
-    @Override
-    public void process(final Type eventType, final JobConfigurationPOJO 
jobConfigPOJO) {
-        String jobId = jobConfigPOJO.getJobName();
-        if (jobConfigPOJO.isDisabled()) {
-            PipelineJobCenter.stop(jobId);
-            return;
-        }
-        switch (eventType) {
-            case ADDED:
-            case UPDATED:
-                if (PipelineJobCenter.isJobExisting(jobId)) {
-                    log.info("{} added to executing jobs failed since it 
already exists", jobId);
-                } else {
-                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), 
PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> 
{
-                        if (null != throwable) {
-                            log.error("execute failed, jobId={}", jobId, 
throwable);
-                        }
-                    });
-                }
-                break;
-            case DELETED:
-                PipelineJobCenter.stop(jobId);
-                break;
-            default:
-                break;
-        }
-    }
-    
-    private void execute(final JobConfigurationPOJO jobConfigPOJO) {
-        CDCJob job = new CDCJob();
-        PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
-        OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, 
jobConfigPOJO.toJobConfiguration());
-        job.setJobBootstrap(oneOffJobBootstrap);
-        oneOffJobBootstrap.execute();
-    }
-    
-    @Override
-    public String getType() {
-        return new CDCJobType().getTypeName();
-    }
-}
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
 
b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
deleted file mode 100644
index 3ed558616e8..00000000000
--- 
a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.cdc.core.metadata.processor.CDCJobConfigurationChangedProcessor
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
index c44c9d1b208..86a26e2e302 100644
--- 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
@@ -22,11 +22,14 @@ import 
org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+@RunWith(MockitoJUnitRunner.class)
 public final class CDCImporterCreatorTest {
     
     @Mock
@@ -34,7 +37,7 @@ public final class CDCImporterCreatorTest {
     
     @Test
     public void assertCreateCDCImporter() {
-        Importer actual = 
ImporterCreatorFactory.getInstance("CDC").createImporter(importerConfig, new 
CDCImporterConnector(), null, null);
+        Importer actual = 
ImporterCreatorFactory.getInstance("CDC").createImporter(importerConfig, new 
CDCImporterConnector(null), null, null);
         assertThat(actual, instanceOf(CDCImporter.class));
     }
 }
diff --git 
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto 
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
index 422a0c9625b..3eac0066d39 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
@@ -78,18 +78,22 @@ message CreateSubscriptionRequest {
 }
 
 message StartSubscriptionRequest {
-  string subscriptionName = 1;
+  string database = 1;
+  string subscriptionName = 2;
 }
 
 message StopSubscriptionRequest {
-  string subscriptionName = 1;
+  string database = 1;
+  string subscriptionName = 2;
 }
 
 message DropSubscriptionRequest {
-  string subscriptionName = 1;
+  string database = 1;
+  string subscriptionName = 2;
 }
 
 message AckRequest {
-  string subscriptionName = 1;
-  string ack_id = 2;
+  string database = 1;
+  string subscriptionName = 2;
+  string ack_id = 3;
 }
diff --git 
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto 
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
index 4dcef30b85d..a583f3f664c 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
@@ -34,7 +34,7 @@ message CDCResponse {
   oneof response {
     ServerGreetingResult server_greeting_result = 3;
     CreateSubscriptionResult create_subscription_result = 4;
-    FetchRecordResult fetch_record_result = 5;
+    DataRecordResult data_record_result = 5;
   }
 
   optional string error_code = 14;
@@ -67,7 +67,7 @@ message ClobValue {
   string value = 1;
 }
 
-message FetchRecordResult {
+message DataRecordResult {
   message Record {
     map<string, google.protobuf.Any> before = 1;
     map<string, google.protobuf.Any> after = 2;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 3fb5d12b4c7..9f36b2ed32d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -26,8 +26,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
-import 
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
@@ -35,6 +33,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHas
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
+import 
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -96,13 +96,13 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
             return Optional.of(jobId);
         }
         repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
getJobClassName());
-        repositoryAPI.persist(jobConfigKey, 
convertJobConfigurationToText(jobConfig));
+        repositoryAPI.persist(jobConfigKey, 
YamlEngine.marshal(convertJobConfiguration(jobConfig)));
         return Optional.of(jobId);
     }
     
     protected abstract String getJobClassName();
     
-    private String convertJobConfigurationToText(final 
PipelineJobConfiguration jobConfig) {
+    protected JobConfigurationPOJO convertJobConfiguration(final 
PipelineJobConfiguration jobConfig) {
         JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
         jobConfigPOJO.setJobName(jobConfig.getJobId());
         jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount());
@@ -110,7 +110,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
         String createTimeFormat = 
LocalDateTime.now().format(DATE_TIME_FORMATTER);
         jobConfigPOJO.getProps().setProperty("create_time", createTimeFormat);
         jobConfigPOJO.getProps().setProperty("start_time_millis", 
System.currentTimeMillis() + "");
-        return YamlEngine.marshal(jobConfigPOJO);
+        return jobConfigPOJO;
     }
     
     protected abstract YamlPipelineJobConfiguration 
swapToYamlJobConfiguration(PipelineJobConfiguration jobConfig);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
index 98e7d85aa87..2cbf39df5da 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
@@ -23,6 +23,8 @@ import 
org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 
+import java.util.Optional;
+
 /**
  * Pipeline changed job configuration processor factory.
  */
@@ -39,7 +41,7 @@ public final class 
PipelineChangedJobConfigurationProcessorFactory {
      * @param jobType job type
      * @return instance
      */
-    public static PipelineChangedJobConfigurationProcessor getInstance(final 
JobType jobType) {
-        return 
TypedSPIRegistry.getRegisteredService(PipelineChangedJobConfigurationProcessor.class,
 jobType.getTypeName());
+    public static Optional<PipelineChangedJobConfigurationProcessor> 
findInstance(final JobType jobType) {
+        return 
TypedSPIRegistry.findRegisteredService(PipelineChangedJobConfigurationProcessor.class,
 jobType.getTypeName());
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
index 18da7257ad5..95ca1d43c33 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessorFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -52,7 +51,7 @@ public final class ChangedJobConfigurationDispatcher 
implements PipelineMetaData
             return;
         }
         log.info("{} job configuration: {}, disabled={}", event.getType(), 
event.getKey(), jobConfigPOJO.isDisabled());
-        PipelineChangedJobConfigurationProcessor processor = 
PipelineChangedJobConfigurationProcessorFactory.getInstance(PipelineJobIdUtils.parseJobType(jobConfigPOJO.getJobName()));
-        processor.process(event.getType(), jobConfigPOJO);
+        
PipelineChangedJobConfigurationProcessorFactory.findInstance(PipelineJobIdUtils.parseJobType(jobConfigPOJO.getJobName()))
+                .ifPresent(processor -> processor.process(event.getType(), 
jobConfigPOJO));
     }
 }
diff --git 
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
 
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index 34b1aeedeab..42d2d8c77d2 100644
--- 
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ 
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -18,18 +18,32 @@
 package org.apache.shardingsphere.proxy.backend.handler.cdc;
 
 import com.google.common.base.Strings;
+import io.netty.channel.Channel;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
 import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.CreateSubscriptionJobParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+import 
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
 import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CreateSubscriptionResult;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.CreateSubscriptionJobParameter;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import org.apache.shardingsphere.sharding.rule.TableRule;
 
@@ -71,8 +85,13 @@ public final class CDCBackendHandler {
         }
         CreateSubscriptionJobParameter parameter = new 
CreateSubscriptionJobParameter(subscriptionRequest.getDatabase(), tableNames, 
subscriptionRequest.getSubscriptionName(),
                 subscriptionRequest.getSubscriptionMode().name(), 
actualDataNodesMap);
-        CDCJobAPIFactory.getInstance().createJobAndStart(parameter);
-        return 
CDCResponseGenerator.succeedBuilder(request.getRequestId()).build();
+        if (CDCJobAPIFactory.getInstance().createJob(parameter)) {
+            return 
CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(CreateSubscriptionResult.newBuilder()
+                    
.setSubscriptionName(subscriptionRequest.getSubscriptionName()).setExisting(false).build()).build();
+        } else {
+            return 
CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(CreateSubscriptionResult.newBuilder()
+                    
.setSubscriptionName(subscriptionRequest.getSubscriptionName()).setExisting(true).build()).build();
+        }
     }
     
     private List<DataNode> getActualDataNodes(final ShardingRule shardingRule, 
final String logicTableName) {
@@ -80,4 +99,51 @@ public final class CDCBackendHandler {
         // TODO support virtual data source name
         return tableRule.getActualDataNodes();
     }
+    
+    /**
+     * Start subscription.
+     *
+     * @param request request
+     * @param channel channel
+     * @param connectionContext connection context
+     * @return CDC response
+     */
+    public CDCResponse startSubscription(final CDCRequest request, final 
Channel channel, final CDCConnectionContext connectionContext) {
+        StartSubscriptionRequest startSubscriptionRequest = 
request.getStartSubscription();
+        CDCJobAPI jobAPI = CDCJobAPIFactory.getInstance();
+        String jobId = jobAPI.marshalJobId(new 
CDCJobId(startSubscriptionRequest.getDatabase(), 
startSubscriptionRequest.getSubscriptionName()));
+        CDCJobConfiguration cdcJobConfig = (CDCJobConfiguration) 
jobAPI.getJobConfiguration(jobId);
+        if (null == cdcJobConfig) {
+            return CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.SERVER_ERROR, String.format("the %s job config isn't 
exists",
+                    startSubscriptionRequest.getSubscriptionName()));
+        }
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
+        if (!jobConfigPOJO.isDisabled()) {
+            return CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.SERVER_ERROR, String.format("the %s is being used", 
startSubscriptionRequest.getSubscriptionName()));
+        }
+        jobConfigPOJO.setDisabled(false);
+        
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
+        CDCJob job = new CDCJob(new CDCImporterConnector(channel));
+        for (int i = 0; i < cdcJobConfig.getJobShardingCount(); i++) {
+            PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+            OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, 
jobConfigPOJO.toJobConfiguration());
+            job.setJobBootstrap(oneOffJobBootstrap);
+            oneOffJobBootstrap.execute();
+        }
+        connectionContext.setStatus(CDCConnectionStatus.SUBSCRIBED);
+        connectionContext.setJobId(jobId);
+        return 
CDCResponseGenerator.succeedBuilder(request.getRequestId()).build();
+    }
+    
+    /**
+     * Stop subscription.
+     *
+     * @param jobId job id
+     */
+    public void stopSubscription(final String jobId) {
+        PipelineJobCenter.stop(jobId);
+        JobConfigurationPOJO jobConfig = 
PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
+        jobConfig.setDisabled(true);
+        
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfig);
+    }
 }
diff --git 
a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
 
b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
index 24823454901..53e2c2c29b0 100644
--- 
a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
+++ 
b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
@@ -47,8 +47,8 @@ import java.util.Properties;
 public final class FixtureCDCJobAPI implements InventoryIncrementalJobAPI, 
CDCJobAPI {
     
     @Override
-    public String createJobAndStart(final CreateSubscriptionJobParameter 
event) {
-        return "";
+    public boolean createJob(final CreateSubscriptionJobParameter event) {
+        return true;
     }
     
     @Override
diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 7be8bea54d1..451e6ac665e 100644
--- 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
 import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
@@ -67,6 +68,15 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         ctx.writeAndFlush(response);
     }
     
+    @Override
+    public void channelInactive(final ChannelHandlerContext ctx) {
+        CDCConnectionContext connectionContext = 
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
+        if (null != connectionContext.getJobId()) {
+            backendHandler.stopSubscription(connectionContext.getJobId());
+        }
+        ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
+    }
+    
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
         CDCConnectionContext connectionContext = 
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
@@ -131,7 +141,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         CreateSubscriptionRequest createSubscriptionRequest = 
request.getCreateSubscription();
         if (createSubscriptionRequest.getTableNamesList().isEmpty() || 
createSubscriptionRequest.getDatabase().isEmpty() || 
createSubscriptionRequest.getSubscriptionName().isEmpty()
                 || createSubscriptionRequest.getSubscriptionMode() == 
SubscriptionMode.UNKNOWN) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal subscription request 
parameter"))
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal create subscription 
request parameter"))
                     .addListener(ChannelFutureListener.CLOSE);
             return;
         }
@@ -140,9 +150,18 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
     }
     
     private void processStartSubscription(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
-        // TODO waiting for pipeline refactoring finished
-        connectionContext.setStatus(CDCConnectionStatus.SUBSCRIBED);
-        
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+        if (!request.hasStartSubscription()) {
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss start subscription request 
body"))
+                    .addListener(ChannelFutureListener.CLOSE);
+            return;
+        }
+        StartSubscriptionRequest startSubscriptionRequest = 
request.getStartSubscription();
+        if (startSubscriptionRequest.getDatabase().isEmpty() || 
startSubscriptionRequest.getSubscriptionName().isEmpty()) {
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal start subscription request 
parameter"))
+                    .addListener(ChannelFutureListener.CLOSE);
+        }
+        CDCResponse response = backendHandler.startSubscription(request, 
ctx.channel(), connectionContext);
+        ctx.writeAndFlush(response);
     }
     
     private void stopStartSubscription(final ChannelHandlerContext ctx, final 
CDCRequest request, final CDCConnectionContext connectionContext) {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
index a07ca7de1b3..1ad3adb1ab4 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
@@ -19,20 +19,27 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.core.metadata.node.event
 
 import 
org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessorFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor.ConsistencyCheckChangedJobConfigurationProcessor;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor.MigrationChangedJobConfigurationProcessor;
-import org.hamcrest.MatcherAssert;
 import org.junit.Test;
 
+import java.util.Optional;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 public final class PipelineChangedJobConfigurationProcessorFactoryTest {
     
     @Test
     public void assertGetInstance() {
-        
MatcherAssert.assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(new
 MigrationJobType()), 
instanceOf(MigrationChangedJobConfigurationProcessor.class));
-        
assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(new 
ConsistencyCheckJobType()), 
instanceOf(ConsistencyCheckChangedJobConfigurationProcessor.class));
+        Optional<PipelineChangedJobConfigurationProcessor> migrationProcessor 
= PipelineChangedJobConfigurationProcessorFactory.findInstance(new 
MigrationJobType());
+        assertTrue(migrationProcessor.isPresent());
+        assertThat(migrationProcessor.get(), 
instanceOf(MigrationChangedJobConfigurationProcessor.class));
+        Optional<PipelineChangedJobConfigurationProcessor> 
consistencyCheckProcessor = 
PipelineChangedJobConfigurationProcessorFactory.findInstance(new 
ConsistencyCheckJobType());
+        assertTrue(consistencyCheckProcessor.isPresent());
+        assertThat(consistencyCheckProcessor.get(), 
instanceOf(ConsistencyCheckChangedJobConfigurationProcessor.class));
     }
 }

Reply via email to