This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 440536943cc Add CDC job start subscription and add client connection
status. (#23026)
440536943cc is described below
commit 440536943ccad254425c25aadb1b491ffd9d1670
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Dec 26 12:24:23 2022 +0800
Add CDC job start subscription and add client connection status. (#23026)
* Add CDC job start subscription and add client connection status.
* Fix codestyle
* Remove blank line
---
.../spi/importer/ImporterCreatorFactory.java | 6 +-
.../client/constant/ClientConnectionStatus.java} | 22 +++---
.../client/context/ClientConnectionContext.java} | 13 ++--
.../cdc/client/handler/LoginRequestHandler.java | 60 ++++++++--------
.../client/handler/SubscriptionRequestHandler.java | 41 ++++++++---
.../data/pipeline/cdc/api/CDCJobAPI.java | 4 +-
.../data/pipeline/cdc/api/impl/CDCJobAPIImpl.java | 22 +++++-
.../pipeline/cdc/context/CDCConnectionContext.java | 3 +
.../cdc/context/job/CDCJobItemContext.java | 5 +-
.../pipeline/cdc/core/importer/CDCImporter.java | 23 +++++++
.../cdc/core/importer/CDCImporterCreator.java | 2 +-
.../importer/connector/CDCImporterConnector.java | 8 ++-
.../data/pipeline/cdc/core/job/CDCJob.java | 5 +-
.../CDCJobConfigurationChangedProcessor.java | 79 ----------------------
...andler.PipelineChangedJobConfigurationProcessor | 18 -----
.../cdc/core/importer/CDCImporterCreatorTest.java | 5 +-
.../src/main/proto/CDCRequestProtocol.proto | 14 ++--
.../src/main/proto/CDCResponseProtocol.proto | 4 +-
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 10 +--
...ineChangedJobConfigurationProcessorFactory.java | 6 +-
.../impl/ChangedJobConfigurationDispatcher.java | 5 +-
.../backend/handler/cdc/CDCBackendHandler.java | 72 +++++++++++++++++++-
.../handler/cdc/fixture/FixtureCDCJobAPI.java | 4 +-
.../frontend/netty/CDCChannelInboundHandler.java | 27 ++++++--
...hangedJobConfigurationProcessorFactoryTest.java | 13 +++-
25 files changed, 277 insertions(+), 194 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
index 35e85f0d5f7..0445ec72db2 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
@@ -35,10 +35,10 @@ public final class ImporterCreatorFactory {
/**
* Get importer creator instance.
*
- * @param databaseType database type
+ * @param importType import type
* @return importer creator
*/
- public static ImporterCreator getInstance(final String databaseType) {
- return TypedSPIRegistry.getRegisteredService(ImporterCreator.class,
databaseType);
+ public static ImporterCreator getInstance(final String importType) {
+ return TypedSPIRegistry.getRegisteredService(ImporterCreator.class,
importType);
}
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
similarity index 71%
copy from
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
copy to
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
index 2b74ddfd816..55583b42b51 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
@@ -15,18 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.context;
-
-import lombok.Getter;
-import lombok.Setter;
-import
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+package org.apache.shardingsphere.data.pipeline.cdc.client.constant;
/**
- * CDC connection context.
+ * Client connection status.
*/
-@Getter
-public final class CDCConnectionContext {
+public enum ClientConnectionStatus {
+
+ CONNECTED,
+
+ NOT_LOGGED_IN,
+
+ LOGGING_IN,
+
+ CREATING_SUBSCRIPTION,
- @Setter
- private volatile CDCConnectionStatus status;
+ SUBSCRIBING
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
similarity index 66%
copy from
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
copy to
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
index 2b74ddfd816..77eaaf19854 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
@@ -15,18 +15,21 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.context;
+package org.apache.shardingsphere.data.pipeline.cdc.client.context;
+import io.netty.util.AttributeKey;
import lombok.Getter;
import lombok.Setter;
-import
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+import
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
/**
- * CDC connection context.
+ * Client connection context.
*/
@Getter
-public final class CDCConnectionContext {
+public final class ClientConnectionContext {
+
+ public static final AttributeKey<ClientConnectionContext> CONTEXT_KEY =
AttributeKey.valueOf("client.context");
@Setter
- private volatile CDCConnectionStatus status;
+ private volatile ClientConnectionStatus status;
}
diff --git
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
index 10974d5e6d9..2372f37fc86 100644
---
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
@@ -20,9 +20,10 @@ package
org.apache.shardingsphere.data.pipeline.cdc.client.handler;
import com.google.common.hash.Hashing;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.util.AttributeKey;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
+import
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
import
org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
@@ -34,8 +35,6 @@ import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
-import java.util.Objects;
-
/**
* Login request handler.
*/
@@ -43,50 +42,57 @@ import java.util.Objects;
@RequiredArgsConstructor
public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
- private static final AttributeKey<String> LOGIN_REQUEST_ID_KEY =
AttributeKey.valueOf("login.request.id");
-
private final String username;
private final String password;
- private boolean loggedIn;
+ @Override
+ public void channelActive(final ChannelHandlerContext ctx) {
+ ClientConnectionContext context = new ClientConnectionContext();
+ context.setStatus(ClientConnectionStatus.CONNECTED);
+
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent(context);
+ }
@Override
public void channelInactive(final ChannelHandlerContext ctx) {
- ctx.channel().attr(LOGIN_REQUEST_ID_KEY).set(null);
+
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent(null);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg)
{
- if (loggedIn) {
- ctx.fireChannelRead(msg);
- return;
- }
CDCResponse response = (CDCResponse) msg;
- if (response.hasServerGreetingResult()) {
- ServerGreetingResult serverGreetingResult =
response.getServerGreetingResult();
- log.info("Server greeting result, server version: {}, protocol
version: {}", serverGreetingResult.getServerVersion(),
serverGreetingResult.getProtocolVersion());
- sendLoginRequest(ctx);
- return;
- }
- if (Status.FAILED == response.getStatus()) {
- log.error("login failed, {}", msg);
- return;
- }
- if (Status.SUCCEED == response.getStatus() &&
Objects.equals(ctx.channel().attr(LOGIN_REQUEST_ID_KEY).get(),
response.getRequestId())) {
- log.info("login success, username {}", username);
- loggedIn = true;
- ctx.fireUserEventTriggered(new CreateSubscriptionEvent());
+ ClientConnectionContext connectionContext =
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
+ switch (connectionContext.getStatus()) {
+ case CONNECTED:
+ setLogindRequest(ctx, response, connectionContext);
+ break;
+ case NOT_LOGGED_IN:
+ sendSubscriptionEvent(ctx, response, connectionContext);
+ break;
+ default:
+ ctx.fireChannelRead(msg);
}
}
- private void sendLoginRequest(final ChannelHandlerContext ctx) {
+ private void setLogindRequest(final ChannelHandlerContext ctx, final
CDCResponse response, final ClientConnectionContext connectionContext) {
+ ServerGreetingResult serverGreetingResult =
response.getServerGreetingResult();
+ log.info("Server greeting result, server version: {}, protocol
version: {}", serverGreetingResult.getServerVersion(),
serverGreetingResult.getProtocolVersion());
String encryptPassword =
Hashing.sha256().hashBytes(password.getBytes()).toString().toUpperCase();
LoginRequest loginRequest =
LoginRequest.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(username).setPassword(encryptPassword).build()).build();
String loginRequestId = RequestIdUtil.generateRequestId();
- ctx.channel().attr(LOGIN_REQUEST_ID_KEY).setIfAbsent(loginRequestId);
CDCRequest data =
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(loginRequestId).setLogin(loginRequest).build();
ctx.writeAndFlush(data);
+ connectionContext.setStatus(ClientConnectionStatus.NOT_LOGGED_IN);
+ }
+
+ private void sendSubscriptionEvent(final ChannelHandlerContext ctx, final
CDCResponse response, final ClientConnectionContext connectionContext) {
+ if (response.getStatus() == Status.SUCCEED) {
+ log.info("login success, username {}", username);
+ connectionContext.setStatus(ClientConnectionStatus.LOGGING_IN);
+ ctx.fireUserEventTriggered(new CreateSubscriptionEvent());
+ } else {
+ log.error("login failed, username {}", username);
+ }
}
@Override
diff --git
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
index 569d9d4af14..e1aa325a31d 100644
---
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
@@ -21,6 +21,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
+import
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
import
org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
@@ -62,21 +64,38 @@ public final class SubscriptionRequestHandler extends
ChannelInboundHandlerAdapt
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg)
{
CDCResponse response = (CDCResponse) msg;
- if (Status.SUCCEED == response.getStatus()) {
- processSucceed(ctx, response);
+ if (response.getStatus() == Status.FAILED) {
+ log.error("received error response {}", msg);
+ }
+ ClientConnectionContext connectionContext =
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
+ if (connectionContext.getStatus() ==
ClientConnectionStatus.LOGGING_IN) {
+ if (!response.hasCreateSubscriptionResult()) {
+ log.error("not find the create subscription result");
+ return;
+ }
+ sendCreateSubscriptionRequest(ctx, response, connectionContext);
+ } else if (connectionContext.getStatus() ==
ClientConnectionStatus.CREATING_SUBSCRIPTION) {
+ startSubscription(response, connectionContext);
} else {
- log.error("subscription response error {}", msg);
+ subscribeDataRecords(ctx);
}
}
- private void processSucceed(final ChannelHandlerContext ctx, final
CDCResponse response) {
- if (response.hasCreateSubscriptionResult()) {
- log.info("create subscription succeed, subscription name {}",
response.getCreateSubscriptionResult().getSubscriptionName());
- StartSubscriptionRequest startSubscriptionRequest =
StartSubscriptionRequest.newBuilder().setSubscriptionName(subscriptionName).build();
- Builder builder =
CDCRequest.newBuilder().setRequestId(RequestIdUtil.generateRequestId()).setStartSubscription(startSubscriptionRequest);
- ctx.writeAndFlush(builder.build());
- }
- // TODO waiting for pipeline refactoring finished
+ private void sendCreateSubscriptionRequest(final ChannelHandlerContext
ctx, final CDCResponse response, final ClientConnectionContext
connectionContext) {
+ log.info("create subscription succeed, subscription name {}, exist
{}", response.getCreateSubscriptionResult().getSubscriptionName(),
response.getCreateSubscriptionResult().getExisting());
+ StartSubscriptionRequest startSubscriptionRequest =
StartSubscriptionRequest.newBuilder().setDatabase(database).setSubscriptionName(subscriptionName).build();
+ Builder builder =
CDCRequest.newBuilder().setRequestId(RequestIdUtil.generateRequestId()).setStartSubscription(startSubscriptionRequest);
+ ctx.writeAndFlush(builder.build());
+
connectionContext.setStatus(ClientConnectionStatus.CREATING_SUBSCRIPTION);
+ }
+
+ private void startSubscription(final CDCResponse response, final
ClientConnectionContext connectionContext) {
+ log.info("start subscription succeed, subscription name {}",
response.getCreateSubscriptionResult().getSubscriptionName());
+ connectionContext.setStatus(ClientConnectionStatus.SUBSCRIBING);
+ }
+
+ private void subscribeDataRecords(final ChannelHandlerContext ctx) {
+ // TODO to be implemented
}
@Override
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index a7e5fc7c5b6..741dda9187c 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -39,10 +39,10 @@ public interface CDCJobAPI extends
InventoryIncrementalJobAPI, RequiredSPI {
CDCProcessContext buildPipelineProcessContext(PipelineJobConfiguration
pipelineJobConfig);
/**
- * Create CDC job config and start.
+ * Create CDC job config.
*
* @param event create CDC job event
* @return job id
*/
- String createJobAndStart(CreateSubscriptionJobParameter event);
+ boolean createJob(CreateSubscriptionJobParameter event);
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
index a3c182dfdca..7b93604f380 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
@@ -52,10 +52,14 @@ import
org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.util.JobDataNodeLineConvertUtil;
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -63,6 +67,8 @@ import
org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtra
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
@@ -94,7 +100,7 @@ public final class CDCJobAPIImpl extends
AbstractInventoryIncrementalJobAPIImpl
private final YamlPipelineDataSourceConfigurationSwapper
pipelineDataSourceConfigSwapper = new
YamlPipelineDataSourceConfigurationSwapper();
@Override
- public String createJobAndStart(final CreateSubscriptionJobParameter
event) {
+ public boolean createJob(final CreateSubscriptionJobParameter event) {
YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
yamlJobConfig.setDatabase(event.getDatabase());
yamlJobConfig.setTableNames(event.getSubscribeTableNames());
@@ -109,8 +115,18 @@ public final class CDCJobAPIImpl extends
AbstractInventoryIncrementalJobAPIImpl
yamlJobConfig.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
extendYamlJobConfiguration(yamlJobConfig);
CDCJobConfiguration jobConfig = new
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
- start(jobConfig);
- return jobConfig.getJobId();
+ ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
+ GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
+ String jobConfigKey =
PipelineMetaDataNode.getJobConfigPath(jobConfig.getJobId());
+ if (repositoryAPI.isExisted(jobConfigKey)) {
+ log.warn("cdc job already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
+ return false;
+ }
+
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
getJobClassName());
+ JobConfigurationPOJO jobConfigurationPOJO =
convertJobConfiguration(jobConfig);
+ jobConfigurationPOJO.setDisabled(true);
+ repositoryAPI.persist(jobConfigKey,
YamlEngine.marshal(jobConfigurationPOJO));
+ return true;
}
private ShardingSpherePipelineDataSourceConfiguration
getDataSourceConfiguration(final ShardingSphereDatabase database) {
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
index 2b74ddfd816..e73199c02e1 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
@@ -29,4 +29,7 @@ public final class CDCConnectionContext {
@Setter
private volatile CDCConnectionStatus status;
+
+ @Setter
+ private volatile String jobId;
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
index ee6596dec22..c2702337ec6 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
@@ -32,7 +32,6 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
-import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -67,6 +66,8 @@ public final class CDCJobItemContext implements
InventoryIncrementalJobItemConte
private final PipelineDataSourceManager dataSourceManager;
+ private final ImporterConnector importerConnector;
+
private final Collection<InventoryTask> inventoryTasks = new
LinkedList<>();
private final Collection<IncrementalTask> incrementalTasks = new
LinkedList<>();
@@ -120,7 +121,7 @@ public final class CDCJobItemContext implements
InventoryIncrementalJobItemConte
@Override
public ImporterConnector getImporterConnector() {
- return new CDCImporterConnector();
+ return importerConnector;
}
@Override
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 76a9a59df09..e9fd10e3ac4 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -17,14 +17,37 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
+import lombok.AccessLevel;
+import lombok.Getter;
+import
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
/**
* CDC importer.
*/
public final class CDCImporter extends AbstractLifecycleExecutor implements
Importer {
+ @Getter(AccessLevel.PROTECTED)
+ private final ImporterConfiguration importerConfig;
+
+ private final PipelineChannel channel;
+
+ private final PipelineJobProgressListener jobProgressListener;
+
+ private final JobRateLimitAlgorithm rateLimitAlgorithm;
+
+ public CDCImporter(final ImporterConfiguration importerConfig, final
ImporterConnector importerConnector, final PipelineChannel channel, final
PipelineJobProgressListener jobProgressListener) {
+ this.importerConfig = importerConfig;
+ rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
+ this.channel = channel;
+ this.jobProgressListener = jobProgressListener;
+ }
+
@Override
protected void runBlocking() {
// TODO to be implemented
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
index f329a3d1623..d27023370aa 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
@@ -32,7 +32,7 @@ public final class CDCImporterCreator implements
ImporterCreator {
@Override
public Importer createImporter(final ImporterConfiguration importerConfig,
final ImporterConnector importerConnector, final PipelineChannel channel,
final PipelineJobProgressListener
jobProgressListener) {
- return new CDCImporter();
+ return new CDCImporter(importerConfig, importerConnector, channel,
jobProgressListener);
}
@Override
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
index c268dbbcd98..ba7a7e40b2c 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
@@ -17,17 +17,21 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector;
+import io.netty.channel.Channel;
+import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
/**
* CDC importer connector.
*/
+@RequiredArgsConstructor
public final class CDCImporterConnector implements ImporterConnector {
+ private final Channel channel;
+
@Override
public Object getConnector() {
- // TODO to be implemented
- return null;
+ return channel;
}
@Override
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 4f9619f01e5..8dbe1a30974 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -35,6 +35,7 @@ import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
import
org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
+import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
/**
@@ -44,6 +45,8 @@ import
org.apache.shardingsphere.elasticjob.api.ShardingContext;
@Slf4j
public final class CDCJob extends AbstractSimplePipelineJob {
+ private final ImporterConnector importerConnector;
+
private final CDCJobAPI jobAPI = CDCJobAPIFactory.getInstance();
private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
@@ -62,7 +65,7 @@ public final class CDCJob extends AbstractSimplePipelineJob {
InventoryIncrementalJobItemProgress initProgress =
jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
CDCProcessContext jobProcessContext =
jobAPI.buildPipelineProcessContext(jobConfig);
CDCTaskConfiguration taskConfig =
jobAPI.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
- return new CDCJobItemContext(jobConfig, shardingItem, initProgress,
jobProcessContext, taskConfig, dataSourceManager);
+ return new CDCJobItemContext(jobConfig, shardingItem, initProgress,
jobProcessContext, taskConfig, dataSourceManager, importerConnector);
}
protected PipelineTasksRunner buildPipelineTasksRunner(final
PipelineJobItemContext pipelineJobItemContext) {
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/metadata/processor/CDCJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/metadata/processor/CDCJobConfigurationChangedProcessor.java
deleted file mode 100644
index f6607ec0c01..00000000000
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/metadata/processor/CDCJobConfigurationChangedProcessor.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.metadata.processor;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
-import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
-
-import java.util.concurrent.CompletableFuture;
-
-/**
- * CDC job configuration changed processor.
- */
-@Slf4j
-public final class CDCJobConfigurationChangedProcessor implements
PipelineChangedJobConfigurationProcessor {
-
- @Override
- public void process(final Type eventType, final JobConfigurationPOJO
jobConfigPOJO) {
- String jobId = jobConfigPOJO.getJobName();
- if (jobConfigPOJO.isDisabled()) {
- PipelineJobCenter.stop(jobId);
- return;
- }
- switch (eventType) {
- case ADDED:
- case UPDATED:
- if (PipelineJobCenter.isJobExisting(jobId)) {
- log.info("{} added to executing jobs failed since it
already exists", jobId);
- } else {
- CompletableFuture.runAsync(() -> execute(jobConfigPOJO),
PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) ->
{
- if (null != throwable) {
- log.error("execute failed, jobId={}", jobId,
throwable);
- }
- });
- }
- break;
- case DELETED:
- PipelineJobCenter.stop(jobId);
- break;
- default:
- break;
- }
- }
-
- private void execute(final JobConfigurationPOJO jobConfigPOJO) {
- CDCJob job = new CDCJob();
- PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
- OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job,
jobConfigPOJO.toJobConfiguration());
- job.setJobBootstrap(oneOffJobBootstrap);
- oneOffJobBootstrap.execute();
- }
-
- @Override
- public String getType() {
- return new CDCJobType().getTypeName();
- }
-}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
deleted file mode 100644
index 3ed558616e8..00000000000
---
a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.cdc.core.metadata.processor.CDCJobConfigurationChangedProcessor
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
index c44c9d1b208..86a26e2e302 100644
---
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
@@ -22,11 +22,14 @@ import
org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
import
org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
+@RunWith(MockitoJUnitRunner.class)
public final class CDCImporterCreatorTest {
@Mock
@@ -34,7 +37,7 @@ public final class CDCImporterCreatorTest {
@Test
public void assertCreateCDCImporter() {
- Importer actual =
ImporterCreatorFactory.getInstance("CDC").createImporter(importerConfig, new
CDCImporterConnector(), null, null);
+ Importer actual =
ImporterCreatorFactory.getInstance("CDC").createImporter(importerConfig, new
CDCImporterConnector(null), null, null);
assertThat(actual, instanceOf(CDCImporter.class));
}
}
diff --git
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
index 422a0c9625b..3eac0066d39 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
@@ -78,18 +78,22 @@ message CreateSubscriptionRequest {
}
message StartSubscriptionRequest {
- string subscriptionName = 1;
+ string database = 1;
+ string subscriptionName = 2;
}
message StopSubscriptionRequest {
- string subscriptionName = 1;
+ string database = 1;
+ string subscriptionName = 2;
}
message DropSubscriptionRequest {
- string subscriptionName = 1;
+ string database = 1;
+ string subscriptionName = 2;
}
message AckRequest {
- string subscriptionName = 1;
- string ack_id = 2;
+ string database = 1;
+ string subscriptionName = 2;
+ string ack_id = 3;
}
diff --git
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
index 4dcef30b85d..a583f3f664c 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
@@ -34,7 +34,7 @@ message CDCResponse {
oneof response {
ServerGreetingResult server_greeting_result = 3;
CreateSubscriptionResult create_subscription_result = 4;
- FetchRecordResult fetch_record_result = 5;
+ DataRecordResult data_record_result = 5;
}
optional string error_code = 14;
@@ -67,7 +67,7 @@ message ClobValue {
string value = 1;
}
-message FetchRecordResult {
+message DataRecordResult {
message Record {
map<string, google.protobuf.Any> before = 1;
map<string, google.protobuf.Any> after = 2;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 3fb5d12b4c7..9f36b2ed32d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -26,8 +26,6 @@ import
org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
-import
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
@@ -35,6 +33,8 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHas
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
+import
org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -96,13 +96,13 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
return Optional.of(jobId);
}
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId),
getJobClassName());
- repositoryAPI.persist(jobConfigKey,
convertJobConfigurationToText(jobConfig));
+ repositoryAPI.persist(jobConfigKey,
YamlEngine.marshal(convertJobConfiguration(jobConfig)));
return Optional.of(jobId);
}
protected abstract String getJobClassName();
- private String convertJobConfigurationToText(final
PipelineJobConfiguration jobConfig) {
+ protected JobConfigurationPOJO convertJobConfiguration(final
PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
jobConfigPOJO.setJobName(jobConfig.getJobId());
jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount());
@@ -110,7 +110,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
String createTimeFormat =
LocalDateTime.now().format(DATE_TIME_FORMATTER);
jobConfigPOJO.getProps().setProperty("create_time", createTimeFormat);
jobConfigPOJO.getProps().setProperty("start_time_millis",
System.currentTimeMillis() + "");
- return YamlEngine.marshal(jobConfigPOJO);
+ return jobConfigPOJO;
}
protected abstract YamlPipelineJobConfiguration
swapToYamlJobConfiguration(PipelineJobConfiguration jobConfig);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
index 98e7d85aa87..2cbf39df5da 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactory.java
@@ -23,6 +23,8 @@ import
org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
+import java.util.Optional;
+
/**
* Pipeline changed job configuration processor factory.
*/
@@ -39,7 +41,7 @@ public final class
PipelineChangedJobConfigurationProcessorFactory {
* @param jobType job type
* @return instance
*/
- public static PipelineChangedJobConfigurationProcessor getInstance(final
JobType jobType) {
- return
TypedSPIRegistry.getRegisteredService(PipelineChangedJobConfigurationProcessor.class,
jobType.getTypeName());
+ public static Optional<PipelineChangedJobConfigurationProcessor>
findInstance(final JobType jobType) {
+ return
TypedSPIRegistry.findRegisteredService(PipelineChangedJobConfigurationProcessor.class,
jobType.getTypeName());
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
index 18da7257ad5..95ca1d43c33 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessorFactory;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -52,7 +51,7 @@ public final class ChangedJobConfigurationDispatcher
implements PipelineMetaData
return;
}
log.info("{} job configuration: {}, disabled={}", event.getType(),
event.getKey(), jobConfigPOJO.isDisabled());
- PipelineChangedJobConfigurationProcessor processor =
PipelineChangedJobConfigurationProcessorFactory.getInstance(PipelineJobIdUtils.parseJobType(jobConfigPOJO.getJobName()));
- processor.process(event.getType(), jobConfigPOJO);
+
PipelineChangedJobConfigurationProcessorFactory.findInstance(PipelineJobIdUtils.parseJobType(jobConfigPOJO.getJobName()))
+ .ifPresent(processor -> processor.process(event.getType(),
jobConfigPOJO));
}
}
diff --git
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index 34b1aeedeab..42d2d8c77d2 100644
---
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -18,18 +18,32 @@
package org.apache.shardingsphere.proxy.backend.handler.cdc;
import com.google.common.base.Strings;
+import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.CreateSubscriptionJobParameter;
import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+import
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
import
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CreateSubscriptionResult;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.CreateSubscriptionJobParameter;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import org.apache.shardingsphere.sharding.rule.TableRule;
@@ -71,8 +85,13 @@ public final class CDCBackendHandler {
}
CreateSubscriptionJobParameter parameter = new
CreateSubscriptionJobParameter(subscriptionRequest.getDatabase(), tableNames,
subscriptionRequest.getSubscriptionName(),
subscriptionRequest.getSubscriptionMode().name(),
actualDataNodesMap);
- CDCJobAPIFactory.getInstance().createJobAndStart(parameter);
- return
CDCResponseGenerator.succeedBuilder(request.getRequestId()).build();
+ if (CDCJobAPIFactory.getInstance().createJob(parameter)) {
+ return
CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(CreateSubscriptionResult.newBuilder()
+
.setSubscriptionName(subscriptionRequest.getSubscriptionName()).setExisting(false).build()).build();
+ } else {
+ return
CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(CreateSubscriptionResult.newBuilder()
+
.setSubscriptionName(subscriptionRequest.getSubscriptionName()).setExisting(true).build()).build();
+ }
}
private List<DataNode> getActualDataNodes(final ShardingRule shardingRule,
final String logicTableName) {
@@ -80,4 +99,51 @@ public final class CDCBackendHandler {
// TODO support virtual data source name
return tableRule.getActualDataNodes();
}
+
+ /**
+ * Start subscription.
+ *
+ * @param request request
+ * @param channel channel
+ * @param connectionContext connection context
+ * @return CDC response
+ */
+ public CDCResponse startSubscription(final CDCRequest request, final
Channel channel, final CDCConnectionContext connectionContext) {
+ StartSubscriptionRequest startSubscriptionRequest =
request.getStartSubscription();
+ CDCJobAPI jobAPI = CDCJobAPIFactory.getInstance();
+ String jobId = jobAPI.marshalJobId(new
CDCJobId(startSubscriptionRequest.getDatabase(),
startSubscriptionRequest.getSubscriptionName()));
+ CDCJobConfiguration cdcJobConfig = (CDCJobConfiguration)
jobAPI.getJobConfiguration(jobId);
+ if (null == cdcJobConfig) {
+ return CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.SERVER_ERROR, String.format("the %s job config isn't
exists",
+ startSubscriptionRequest.getSubscriptionName()));
+ }
+ JobConfigurationPOJO jobConfigPOJO =
PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
+ if (!jobConfigPOJO.isDisabled()) {
+ return CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.SERVER_ERROR, String.format("the %s is being used",
startSubscriptionRequest.getSubscriptionName()));
+ }
+ jobConfigPOJO.setDisabled(false);
+
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
+ CDCJob job = new CDCJob(new CDCImporterConnector(channel));
+ for (int i = 0; i < cdcJobConfig.getJobShardingCount(); i++) {
+ PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+ OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job,
jobConfigPOJO.toJobConfiguration());
+ job.setJobBootstrap(oneOffJobBootstrap);
+ oneOffJobBootstrap.execute();
+ }
+ connectionContext.setStatus(CDCConnectionStatus.SUBSCRIBED);
+ connectionContext.setJobId(jobId);
+ return
CDCResponseGenerator.succeedBuilder(request.getRequestId()).build();
+ }
+
+ /**
+ * Stop subscription.
+ *
+ * @param jobId job id
+ */
+ public void stopSubscription(final String jobId) {
+ PipelineJobCenter.stop(jobId);
+ JobConfigurationPOJO jobConfig =
PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
+ jobConfig.setDisabled(true);
+
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfig);
+ }
}
diff --git
a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
index 24823454901..53e2c2c29b0 100644
---
a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
+++
b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
@@ -47,8 +47,8 @@ import java.util.Properties;
public final class FixtureCDCJobAPI implements InventoryIncrementalJobAPI,
CDCJobAPI {
@Override
- public String createJobAndStart(final CreateSubscriptionJobParameter
event) {
- return "";
+ public boolean createJob(final CreateSubscriptionJobParameter event) {
+ return true;
}
@Override
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 7be8bea54d1..451e6ac665e 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
@@ -67,6 +68,15 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
ctx.writeAndFlush(response);
}
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) {
+ CDCConnectionContext connectionContext =
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
+ if (null != connectionContext.getJobId()) {
+ backendHandler.stopSubscription(connectionContext.getJobId());
+ }
+ ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
+ }
+
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg)
{
CDCConnectionContext connectionContext =
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
@@ -131,7 +141,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
CreateSubscriptionRequest createSubscriptionRequest =
request.getCreateSubscription();
if (createSubscriptionRequest.getTableNamesList().isEmpty() ||
createSubscriptionRequest.getDatabase().isEmpty() ||
createSubscriptionRequest.getSubscriptionName().isEmpty()
|| createSubscriptionRequest.getSubscriptionMode() ==
SubscriptionMode.UNKNOWN) {
-
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal subscription request
parameter"))
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal create subscription
request parameter"))
.addListener(ChannelFutureListener.CLOSE);
return;
}
@@ -140,9 +150,18 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
}
private void processStartSubscription(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
- // TODO waiting for pipeline refactoring finished
- connectionContext.setStatus(CDCConnectionStatus.SUBSCRIBED);
-
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+ if (!request.hasStartSubscription()) {
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss start subscription request
body"))
+ .addListener(ChannelFutureListener.CLOSE);
+ return;
+ }
+ StartSubscriptionRequest startSubscriptionRequest =
request.getStartSubscription();
+ if (startSubscriptionRequest.getDatabase().isEmpty() ||
startSubscriptionRequest.getSubscriptionName().isEmpty()) {
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal start subscription request
parameter"))
+ .addListener(ChannelFutureListener.CLOSE);
+ }
+ CDCResponse response = backendHandler.startSubscription(request,
ctx.channel(), connectionContext);
+ ctx.writeAndFlush(response);
}
private void stopStartSubscription(final ChannelHandlerContext ctx, final
CDCRequest request, final CDCConnectionContext connectionContext) {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
index a07ca7de1b3..1ad3adb1ab4 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
@@ -19,20 +19,27 @@ package
org.apache.shardingsphere.test.it.data.pipeline.core.metadata.node.event
import
org.apache.shardingsphere.data.pipeline.core.job.type.ConsistencyCheckJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessorFactory;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor.ConsistencyCheckChangedJobConfigurationProcessor;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor.MigrationChangedJobConfigurationProcessor;
-import org.hamcrest.MatcherAssert;
import org.junit.Test;
+import java.util.Optional;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
public final class PipelineChangedJobConfigurationProcessorFactoryTest {
@Test
public void assertGetInstance() {
-
MatcherAssert.assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(new
MigrationJobType()),
instanceOf(MigrationChangedJobConfigurationProcessor.class));
-
assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(new
ConsistencyCheckJobType()),
instanceOf(ConsistencyCheckChangedJobConfigurationProcessor.class));
+ Optional<PipelineChangedJobConfigurationProcessor> migrationProcessor
= PipelineChangedJobConfigurationProcessorFactory.findInstance(new
MigrationJobType());
+ assertTrue(migrationProcessor.isPresent());
+ assertThat(migrationProcessor.get(),
instanceOf(MigrationChangedJobConfigurationProcessor.class));
+ Optional<PipelineChangedJobConfigurationProcessor>
consistencyCheckProcessor =
PipelineChangedJobConfigurationProcessorFactory.findInstance(new
ConsistencyCheckJobType());
+ assertTrue(consistencyCheckProcessor.isPresent());
+ assertThat(consistencyCheckProcessor.get(),
instanceOf(ConsistencyCheckChangedJobConfigurationProcessor.class));
}
}