This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1e6d1b38787 Improve CDC protocol (#24194)
1e6d1b38787 is described below
commit 1e6d1b38787750247fbd0ad26296f49fe94e3990
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Feb 16 22:08:06 2023 +0800
Improve CDC protocol (#24194)
* Rename subscription to streaming in pipeline CDC
* Remove database and incremental_global_orderly at CDC request protocol
* Rename subscription_name to streaming_id and remove subscription_mode
* Refactor stream data, start job immediately
* Fix codestyle
* Remove source_schema
* Improve
---
.../data/pipeline/cdc/client/CDCClient.java | 11 +-
.../client/constant/ClientConnectionStatus.java | 4 +-
.../client/context/ClientConnectionContext.java | 4 +-
...SubscriptionEvent.java => StreamDataEvent.java} | 4 +-
...nRequestHandler.java => CDCRequestHandler.java} | 62 +++++-----
.../cdc/client/handler/LoginRequestHandler.java | 19 +--
.../client/parameter/StartCDCClientParameter.java | 11 +-
.../cdc/client/example/opengauss/Bootstrap.java | 9 +-
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 29 +++--
...nJobParameter.java => StreamDataParameter.java} | 10 +-
.../cdc/config/job/CDCJobConfiguration.java | 6 +-
.../pipeline/cdc/constant/CDCConnectionStatus.java | 2 -
.../pipeline/cdc/context/CDCConnectionContext.java | 2 +
.../data/pipeline/cdc/core/job/CDCJobId.java | 11 +-
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 3 +-
.../cdc/yaml/job/YamlCDCJobConfiguration.java | 6 +-
.../yaml/job/YamlCDCJobConfigurationSwapper.java | 9 +-
.../data/pipeline/cdc/core/job/CDCJobIdTest.java | 4 +-
.../job/YamlCDCJobConfigurationSwapperTest.java | 11 +-
.../src/main/proto/CDCRequestProtocol.proto | 62 ++++------
.../src/main/proto/CDCResponseProtocol.proto | 8 +-
.../backend/handler/cdc/CDCBackendHandler.java | 98 +++++++--------
.../backend/handler/cdc/CDCBackendHandlerTest.java | 23 ++--
.../frontend/netty/CDCChannelInboundHandler.java | 131 +++++++++++----------
.../netty/CDCChannelInboundHandlerTest.java | 13 +-
25 files changed, 263 insertions(+), 289 deletions(-)
diff --git
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index fbb0a5c8f79..d8532932415 100644
---
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -29,8 +29,8 @@ import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.cdc.client.handler.CDCRequestHandler;
import
org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoginRequestHandler;
-import
org.apache.shardingsphere.data.pipeline.cdc.client.handler.SubscriptionRequestHandler;
import
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
@@ -57,11 +57,8 @@ public final class CDCClient {
if (null == parameter.getAddress() ||
parameter.getAddress().isEmpty()) {
throw new IllegalArgumentException("The address parameter can't be
null");
}
- if (null == parameter.getSubscriptionMode()) {
- throw new IllegalArgumentException("The subscriptionMode parameter
can't be null");
- }
- if (null == parameter.getSubscribeTables() ||
parameter.getSubscribeTables().isEmpty()) {
- throw new IllegalArgumentException("The subscribeTables parameter
can't be null");
+ if (null == parameter.getSchemaTables() ||
parameter.getSchemaTables().isEmpty()) {
+ throw new IllegalArgumentException("The schema tables parameter
can't be null");
}
}
@@ -88,7 +85,7 @@ public final class CDCClient {
channel.pipeline().addLast(new
ProtobufVarint32LengthFieldPrepender());
channel.pipeline().addLast(new ProtobufEncoder());
channel.pipeline().addLast(new
LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
- channel.pipeline().addLast(new
SubscriptionRequestHandler(parameter));
+ channel.pipeline().addLast(new
CDCRequestHandler(parameter));
}
});
try {
diff --git
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
index 55583b42b51..f644e5f59ff 100644
---
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
@@ -28,7 +28,5 @@ public enum ClientConnectionStatus {
LOGGING_IN,
- CREATING_SUBSCRIPTION,
-
- SUBSCRIBING
+ STREAMING
}
diff --git
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
index 77eaaf19854..fcbf086979b 100644
---
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
@@ -26,10 +26,12 @@ import
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnect
* Client connection context.
*/
@Getter
+@Setter
public final class ClientConnectionContext {
public static final AttributeKey<ClientConnectionContext> CONTEXT_KEY =
AttributeKey.valueOf("client.context");
- @Setter
private volatile ClientConnectionStatus status;
+
+ private volatile String streamingId;
}
diff --git
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/CreateSubscriptionEvent.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/StreamDataEvent.java
similarity index 92%
rename from
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/CreateSubscriptionEvent.java
rename to
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/StreamDataEvent.java
index 59bb67e898c..ff0a4198da8 100644
---
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/CreateSubscriptionEvent.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/StreamDataEvent.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.client.event;
/**
- * Create subscription event.
+ * Stream data event.
*/
-public final class CreateSubscriptionEvent {
+public final class StreamDataEvent {
}
diff --git
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
similarity index 57%
rename from
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
rename to
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
index e7288950a8a..2f3367ec8ef 100644
---
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
@@ -22,44 +22,46 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
import
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
-import
org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import
org.apache.shardingsphere.data.pipeline.cdc.client.event.StreamDataEvent;
import
org.apache.shardingsphere.data.pipeline.cdc.client.importer.DataSourceImporter;
import org.apache.shardingsphere.data.pipeline.cdc.client.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
import java.util.List;
/**
- * Subscription request handler.
+ * CDC request handler.
*/
@Slf4j
-public final class SubscriptionRequestHandler extends
ChannelInboundHandlerAdapter {
+public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {
private final StartCDCClientParameter parameter;
private final Importer importer;
- public SubscriptionRequestHandler(final StartCDCClientParameter parameter)
{
+ public CDCRequestHandler(final StartCDCClientParameter parameter) {
this.parameter = parameter;
importer = new DataSourceImporter(parameter.getDatabaseType(),
parameter.getImportDataSourceParameter());
}
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final
Object evt) {
- if (evt instanceof CreateSubscriptionEvent) {
- CreateSubscriptionRequest createSubscriptionRequest =
CreateSubscriptionRequest.newBuilder().setDatabase(parameter.getDatabase()).setSubscriptionMode(parameter.getSubscriptionMode())
-
.setSubscriptionName(parameter.getSubscriptionName()).addAllTableNames(parameter.getSubscribeTables()).setIncrementalGlobalOrderly(parameter.isIncrementalGlobalOrderly()).build();
- CDCRequest request =
CDCRequest.newBuilder().setCreateSubscription(createSubscriptionRequest).setRequestId(RequestIdUtil.generateRequestId()).build();
+ if (evt instanceof StreamDataEvent) {
+ StreamDataRequestBody streamDataRequestBody =
StreamDataRequestBody.newBuilder().setDatabase(parameter.getDatabase()).setFull(parameter.isFull())
+
.addAllSourceSchemaTables(parameter.getSchemaTables()).build();
+ CDCRequest request =
CDCRequest.newBuilder().setRequestId(RequestIdUtil.generateRequestId()).setType(Type.STREAM_DATA).setStreamDataRequestBody(streamDataRequestBody).build();
ctx.writeAndFlush(request);
}
}
@@ -71,33 +73,24 @@ public final class SubscriptionRequestHandler extends
ChannelInboundHandlerAdapt
log.error("received error response {}", msg);
}
ClientConnectionContext connectionContext =
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
- if (connectionContext.getStatus() ==
ClientConnectionStatus.LOGGING_IN) {
- if (!response.hasCreateSubscriptionResult()) {
- log.error("not find the create subscription result");
- return;
- }
- sendCreateSubscriptionRequest(ctx, response, connectionContext);
- } else if (connectionContext.getStatus() ==
ClientConnectionStatus.CREATING_SUBSCRIPTION) {
- startSubscription(response, connectionContext);
- } else {
- subscribeDataRecords(ctx, response.getDataRecordResult());
+ if (response.hasStreamDataResult()) {
+ StreamDataResult streamDataResult = response.getStreamDataResult();
+
connectionContext.setStreamingId(streamDataResult.getStreamingId());
+ connectionContext.setStatus(ClientConnectionStatus.STREAMING);
+ } else if (response.hasDataRecordResult()) {
+ processDataRecords(ctx, response.getDataRecordResult());
}
}
- private void sendCreateSubscriptionRequest(final ChannelHandlerContext
ctx, final CDCResponse response, final ClientConnectionContext
connectionContext) {
- log.info("create subscription succeed, subscription name {}, exist
{}", response.getCreateSubscriptionResult().getSubscriptionName(),
response.getCreateSubscriptionResult().getExisting());
- StartSubscriptionRequest startSubscriptionRequest =
StartSubscriptionRequest.newBuilder().setDatabase(parameter.getDatabase()).setSubscriptionName(parameter.getSubscriptionName()).build();
- Builder builder =
CDCRequest.newBuilder().setRequestId(RequestIdUtil.generateRequestId()).setStartSubscription(startSubscriptionRequest);
+ // TODO not remove the method, may be used again in the future
+ private void sendStartStreamingDataRequest(final ChannelHandlerContext
ctx, final String streamingId, final ClientConnectionContext connectionContext)
{
+ StartStreamingRequestBody startStreamingRequest =
StartStreamingRequestBody.newBuilder().setStreamingId(streamingId).build();
+ Builder builder =
CDCRequest.newBuilder().setRequestId(RequestIdUtil.generateRequestId()).setType(Type.START_STREAMING).setStartStreamingRequestBody(startStreamingRequest);
ctx.writeAndFlush(builder.build());
-
connectionContext.setStatus(ClientConnectionStatus.CREATING_SUBSCRIPTION);
- }
-
- private void startSubscription(final CDCResponse response, final
ClientConnectionContext connectionContext) {
- log.info("start subscription succeed, subscription name {}",
response.getCreateSubscriptionResult().getSubscriptionName());
- connectionContext.setStatus(ClientConnectionStatus.SUBSCRIBING);
+ connectionContext.setStatus(ClientConnectionStatus.STREAMING);
}
- private void subscribeDataRecords(final ChannelHandlerContext ctx, final
DataRecordResult result) {
+ private void processDataRecords(final ChannelHandlerContext ctx, final
DataRecordResult result) {
List<Record> recordsList = result.getRecordsList();
for (Record each : recordsList) {
try {
@@ -108,8 +101,7 @@ public final class SubscriptionRequestHandler extends
ChannelInboundHandlerAdapt
throw new RuntimeException(ex);
}
}
- // TODO data needs to be processed, such as writing to a database
-
ctx.channel().writeAndFlush(CDCRequest.newBuilder().setAckRequest(AckRequest.newBuilder().setAckId(result.getAckId()).build()).build());
+
ctx.channel().writeAndFlush(CDCRequest.newBuilder().setType(Type.ACK_STREAMING).setAckStreamingRequestBody(AckStreamingRequestBody.newBuilder().setAckId(result.getAckId()).build()).build());
}
@Override
@@ -120,7 +112,7 @@ public final class SubscriptionRequestHandler extends
ChannelInboundHandlerAdapt
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final
Throwable cause) {
- log.error("subscription handler error", cause);
+ log.error("handler data streaming error", cause);
// TODO passing error messages to the caller
}
}
diff --git
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
index d14df086502..4f06830a576 100644
---
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
@@ -24,13 +24,13 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
import
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
-import
org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import
org.apache.shardingsphere.data.pipeline.cdc.client.event.StreamDataEvent;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.LoginType;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.LoginType;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
@@ -68,7 +68,7 @@ public final class LoginRequestHandler extends
ChannelInboundHandlerAdapter {
setLoginRequest(ctx, response, connectionContext);
break;
case NOT_LOGGED_IN:
- sendSubscriptionEvent(ctx, response, connectionContext);
+ sendStreamDataEvent(ctx, response, connectionContext);
break;
default:
ctx.fireChannelRead(msg);
@@ -79,18 +79,19 @@ public final class LoginRequestHandler extends
ChannelInboundHandlerAdapter {
ServerGreetingResult serverGreetingResult =
response.getServerGreetingResult();
log.info("Server greeting result, server version: {}, protocol
version: {}", serverGreetingResult.getServerVersion(),
serverGreetingResult.getProtocolVersion());
String encryptPassword =
Hashing.sha256().hashBytes(password.getBytes()).toString().toUpperCase();
- LoginRequest loginRequest =
LoginRequest.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(username).setPassword(encryptPassword).build()).build();
+ LoginRequestBody loginRequestBody =
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(username).setPassword(encryptPassword).build())
+ .build();
String loginRequestId = RequestIdUtil.generateRequestId();
- CDCRequest data =
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(loginRequestId).setLogin(loginRequest).build();
+ CDCRequest data =
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(loginRequestId).setLoginRequestBody(loginRequestBody).build();
ctx.writeAndFlush(data);
connectionContext.setStatus(ClientConnectionStatus.NOT_LOGGED_IN);
}
- private void sendSubscriptionEvent(final ChannelHandlerContext ctx, final
CDCResponse response, final ClientConnectionContext connectionContext) {
+ private void sendStreamDataEvent(final ChannelHandlerContext ctx, final
CDCResponse response, final ClientConnectionContext connectionContext) {
if (response.getStatus() == Status.SUCCEED) {
log.info("login success, username {}", username);
connectionContext.setStatus(ClientConnectionStatus.LOGGING_IN);
- ctx.fireUserEventTriggered(new CreateSubscriptionEvent());
+ ctx.fireUserEventTriggered(new StreamDataEvent());
} else {
log.error("login failed, username {}", username);
}
diff --git
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
index a8c916f7b0c..4725a2dbdd3 100644
---
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
@@ -20,8 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import java.util.List;
@@ -45,13 +44,9 @@ public final class StartCDCClientParameter {
private String database;
- private List<TableName> subscribeTables;
+ private List<SchemaTable> schemaTables;
- private String subscriptionName;
-
- private SubscriptionMode subscriptionMode = SubscriptionMode.INCREMENTAL;
-
- private boolean incrementalGlobalOrderly;
+ private boolean full;
private final ImportDataSourceParameter importDataSourceParameter;
}
diff --git
a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
index b86b112057e..5bafbec1ecf 100644
---
a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
+++
b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
@@ -20,8 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.cdc.client.example.opengauss;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.ImportDataSourceParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import java.util.Collections;
@@ -40,10 +39,8 @@ public final class Bootstrap {
parameter.setUsername("root");
parameter.setPassword("root");
parameter.setDatabase("sharding_db");
- parameter.setSubscriptionMode(SubscriptionMode.FULL);
- parameter.setSubscriptionName("subscribe_sharding_db");
- parameter.setIncrementalGlobalOrderly(true);
-
parameter.setSubscribeTables(Collections.singletonList(TableName.newBuilder().setName("t_order").build()));
+ parameter.setFull(true);
+
parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build()));
parameter.setDatabaseType("openGauss");
CDCClient cdcClient = new CDCClient(parameter);
cdcClient.start();
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index bee9d5c08f6..bf7b9e97c9d 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -45,13 +45,12 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
-import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.CreateSubscriptionJobParameter;
+import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -107,14 +106,13 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
* Create CDC job config.
*
* @param param create CDC job param
- * @return true if job is not exist, otherwise false
+ * @return job id
*/
- public boolean createJob(final CreateSubscriptionJobParameter param) {
+ public String createJob(final StreamDataParameter param) {
YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
yamlJobConfig.setDatabase(param.getDatabase());
- yamlJobConfig.setTableNames(param.getSubscribeTableNames());
- yamlJobConfig.setSubscriptionName(param.getSubscriptionName());
- yamlJobConfig.setSubscriptionMode(param.getSubscriptionMode());
+ yamlJobConfig.setSchemaTableNames(param.getSchemaTableNames());
+ yamlJobConfig.setFull(param.isFull());
yamlJobConfig.setDecodeWithTX(param.isDecodeWithTX());
ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabase());
yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
@@ -130,16 +128,16 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
String jobConfigKey =
PipelineMetaDataNode.getJobConfigPath(jobConfig.getJobId());
if (repositoryAPI.isExisted(jobConfigKey)) {
log.warn("cdc job already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
- return false;
+ return jobConfig.getJobId();
}
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
getJobClassName());
JobConfigurationPOJO jobConfigPOJO =
convertJobConfiguration(jobConfig);
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
- if
(SubscriptionMode.INCREMENTAL.name().equals(param.getSubscriptionMode())) {
+ if (!param.isFull()) {
initIncrementalPosition(jobConfig);
}
- return true;
+ return jobConfig.getJobId();
}
private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
@@ -150,7 +148,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
if (getJobItemProgress(jobId, i).isPresent()) {
continue;
}
- TableNameSchemaNameMapping tableNameSchemaNameMapping =
getTableNameSchemaNameMapping(jobConfig.getTableNames());
+ TableNameSchemaNameMapping tableNameSchemaNameMapping =
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames());
DumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig, i, tableNameSchemaNameMapping);
InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
jobItemProgress.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
@@ -195,23 +193,24 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
}
private String generateJobId(final YamlCDCJobConfiguration config) {
- CDCJobId jobId = new CDCJobId(config.getDatabase(),
config.getSubscriptionName());
+ // TODO generate parameter add sink type
+ CDCJobId jobId = new CDCJobId(config.getDatabase(),
config.getSchemaTableNames(), config.isFull());
return marshalJobId(jobId);
}
@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
CDCJobId jobId = (CDCJobId) pipelineJobId;
- String text = Joiner.on('|').join(jobId.getDatabaseName(),
jobId.getSubscriptionName());
+ String text = Joiner.on('|').join(jobId.getDatabaseName(),
jobId.getTableNames(), jobId.isFull());
return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
}
@Override
public CDCTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
CDCJobConfiguration jobConfig = (CDCJobConfiguration)
pipelineJobConfig;
- TableNameSchemaNameMapping tableNameSchemaNameMapping =
getTableNameSchemaNameMapping(jobConfig.getTableNames());
+ TableNameSchemaNameMapping tableNameSchemaNameMapping =
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames());
DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig,
jobShardingItem, tableNameSchemaNameMapping);
- ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
jobConfig.getTableNames(), tableNameSchemaNameMapping);
+ ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
jobConfig.getSchemaTableNames(), tableNameSchemaNameMapping);
CDCTaskConfiguration result = new CDCTaskConfiguration(dumperConfig,
importerConfig);
log.debug("buildTaskConfiguration, result={}", result);
return result;
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
similarity index 84%
rename from
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
rename to
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
index 79c30206e54..a62bf5cda31 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/StreamDataParameter.java
@@ -25,19 +25,17 @@ import java.util.List;
import java.util.Map;
/**
- * Create subscription job parameter.
+ * Stream data parameter.
*/
@RequiredArgsConstructor
@Getter
-public final class CreateSubscriptionJobParameter {
+public final class StreamDataParameter {
private final String database;
- private final List<String> subscribeTableNames;
+ private final List<String> schemaTableNames;
- private final String subscriptionName;
-
- private final String subscriptionMode;
+ private final boolean full;
private final Map<String, List<DataNode>> dataNodesMap;
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index d4c628ac874..7a04f3d7bca 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -36,11 +36,9 @@ public final class CDCJobConfiguration implements
PipelineJobConfiguration {
private final String database;
- private final List<String> tableNames;
+ private final List<String> schemaTableNames;
- private final String subscriptionName;
-
- private final String subscriptionMode;
+ private final boolean full;
private final String sourceDatabaseType;
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java
index 09ce0e484db..4db5e6a54e6 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java
@@ -25,6 +25,4 @@ public enum CDCConnectionStatus {
NOT_LOGGED_IN,
LOGGED_IN,
-
- SUBSCRIBED
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
index 8abb4a70f73..51b689790bf 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
@@ -31,6 +31,8 @@ public final class CDCConnectionContext {
private volatile CDCConnectionStatus status;
+ private volatile String database;
+
private volatile String jobId;
private volatile ShardingSphereUser currentUser;
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
index e838129b8b0..0182c35a74b 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
@@ -22,6 +22,8 @@ import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
+import java.util.List;
+
/**
* CDC job id.
*/
@@ -33,11 +35,14 @@ public final class CDCJobId extends AbstractPipelineJobId {
private final String databaseName;
- private final String subscriptionName;
+ private final List<String> tableNames;
+
+ private final boolean full;
- public CDCJobId(final String databaseName, final String subscriptionName) {
+ public CDCJobId(final String databaseName, final List<String> tableNames,
final boolean full) {
super(new CDCJobType(), CURRENT_VERSION);
this.databaseName = databaseName;
- this.subscriptionName = subscriptionName;
+ this.tableNames = tableNames;
+ this.full = full;
}
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index cbcee655918..8f789c6c2bf 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -27,7 +27,6 @@ import
org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
@@ -70,7 +69,7 @@ public final class CDCJobPreparer {
}
initIncrementalTasks(jobItemContext);
CDCJobConfiguration jobConfig = jobItemContext.getJobConfig();
- if
(SubscriptionMode.FULL.name().equals(jobConfig.getSubscriptionMode())) {
+ if (jobConfig.isFull()) {
initInventoryTasks(jobItemContext);
}
if (needUpdateJobStatus) {
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
index 9a02e7b853e..2d75a23931c 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
@@ -35,11 +35,9 @@ public final class YamlCDCJobConfiguration implements
YamlPipelineJobConfigurati
private String database;
- private List<String> tableNames;
+ private List<String> schemaTableNames;
- private String subscriptionName;
-
- private String subscriptionMode;
+ private boolean full;
private String sourceDatabaseType;
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
index 14276bbed7b..8555a1e765c 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
@@ -40,9 +40,8 @@ public final class YamlCDCJobConfigurationSwapper implements
YamlConfigurationSw
YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
result.setJobId(data.getJobId());
result.setDatabase(data.getDatabase());
- result.setTableNames(data.getTableNames());
- result.setSubscriptionName(data.getSubscriptionName());
- result.setSubscriptionMode(data.getSubscriptionMode());
+ result.setSchemaTableNames(data.getSchemaTableNames());
+ result.setFull(data.isFull());
result.setSourceDatabaseType(data.getSourceDatabaseType());
result.setDataSourceConfiguration(dataSourceConfigSwapper.swapToYamlConfiguration(data.getDataSourceConfig()));
result.setTablesFirstDataNodes(null == data.getTablesFirstDataNodes()
? null : data.getTablesFirstDataNodes().marshal());
@@ -60,8 +59,8 @@ public final class YamlCDCJobConfigurationSwapper implements
YamlConfigurationSw
? Collections.emptyList()
:
yamlConfig.getJobShardingDataNodes().stream().map(JobDataNodeLine::unmarshal).collect(Collectors.toList());
JobDataNodeLine tablesFirstDataNodes = null ==
yamlConfig.getTablesFirstDataNodes() ? null :
JobDataNodeLine.unmarshal(yamlConfig.getTablesFirstDataNodes());
- return new CDCJobConfiguration(yamlConfig.getJobId(),
yamlConfig.getDatabase(), yamlConfig.getTableNames(),
yamlConfig.getSubscriptionName(), yamlConfig.getSubscriptionMode(),
- yamlConfig.getSourceDatabaseType(),
(ShardingSpherePipelineDataSourceConfiguration)
dataSourceConfigSwapper.swapToObject(yamlConfig.getDataSourceConfiguration()),
tablesFirstDataNodes,
+ return new CDCJobConfiguration(yamlConfig.getJobId(),
yamlConfig.getDatabase(), yamlConfig.getSchemaTableNames(),
yamlConfig.isFull(), yamlConfig.getSourceDatabaseType(),
+ (ShardingSpherePipelineDataSourceConfiguration)
dataSourceConfigSwapper.swapToObject(yamlConfig.getDataSourceConfiguration()),
tablesFirstDataNodes,
jobShardingDataNodes, yamlConfig.isDecodeWithTX(),
yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
}
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
index a06730170bd..408610f365a 100644
---
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
@@ -22,6 +22,8 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.junit.Test;
+import java.util.Arrays;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -29,7 +31,7 @@ public final class CDCJobIdTest {
@Test
public void parseJobType() {
- CDCJobId pipelineJobId = new CDCJobId("sharding_db", "test");
+ CDCJobId pipelineJobId = new CDCJobId("sharding_db",
Arrays.asList("test", "t_order"), false);
String jobId =
PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
assertThat(actualJobType, instanceOf(CDCJobType.class));
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
index 51ab34e39dc..c127a9e8798 100644
---
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
public final class YamlCDCJobConfigurationSwapperTest {
@@ -32,14 +33,12 @@ public final class YamlCDCJobConfigurationSwapperTest {
YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
yamlJobConfig.setJobId("j51017f973ac82cb1edea4f5238a258c25e89");
yamlJobConfig.setDatabase("test_db");
- yamlJobConfig.setTableNames(Arrays.asList("t_order", "t_order_item"));
- yamlJobConfig.setSubscriptionName("test_name");
- yamlJobConfig.setSubscriptionMode("FULL");
+ yamlJobConfig.setSchemaTableNames(Arrays.asList("test.t_order",
"t_order_item"));
+ yamlJobConfig.setFull(true);
CDCJobConfiguration actual = new
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
assertThat(actual.getJobId(),
is("j51017f973ac82cb1edea4f5238a258c25e89"));
assertThat(actual.getDatabase(), is("test_db"));
- assertThat(actual.getTableNames(), is(Arrays.asList("t_order",
"t_order_item")));
- assertThat(actual.getSubscriptionName(), is("test_name"));
- assertThat(actual.getSubscriptionMode(), is("FULL"));
+ assertThat(actual.getSchemaTableNames(),
is(Arrays.asList("test.t_order", "t_order_item")));
+ assertTrue(actual.isFull());
}
}
diff --git
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
index 00b3c76c452..02a40d52f19 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
@@ -27,24 +27,24 @@ message CDCRequest {
enum Type {
UNKNOWN = 0;
LOGIN = 1;
- CREATE_SUBSCRIPTION = 2;
- START_SUBSCRIPTION = 3;
- ACK = 4;
- STOP_SUBSCRIPTION = 5;
- DROP_SUBSCRIPTION = 6;
+ STREAM_DATA = 2;
+ ACK_STREAMING = 3;
+ STOP_STREAMING = 4;
+ START_STREAMING = 5;
+ DROP_STREAMING = 6;
}
Type type = 3;
- oneof request {
- LoginRequest login = 4;
- CreateSubscriptionRequest create_subscription = 5;
- StartSubscriptionRequest start_subscription = 6;
- AckRequest ack_request = 7;
- StopSubscriptionRequest stop_subscription = 8;
- DropSubscriptionRequest drop_subscription = 9;
+ oneof request_body {
+ LoginRequestBody login_request_body = 4;
+ StreamDataRequestBody stream_data_request_body = 5;
+ AckStreamingRequestBody ack_streaming_request_body = 6;
+ StopStreamingRequestBody stop_streaming_request_body = 7;
+ StartStreamingRequestBody start_streaming_request_body = 8;
+ DropStreamingRequestBody drop_streaming_request_body = 9;
}
}
-message LoginRequest {
+message LoginRequestBody {
enum LoginType {
UNKNOWN = 0;
BASIC = 1;
@@ -60,38 +60,28 @@ message LoginRequest {
}
}
-message CreateSubscriptionRequest {
+message StreamDataRequestBody {
string database = 1;
- message TableName {
+ message SchemaTable {
string schema = 1;
- string name = 2;
+ string table = 2;
}
- repeated TableName table_names = 2;
- string subscription_name = 3;
- enum SubscriptionMode {
- UNKNOWN = 0;
- INCREMENTAL = 1;
- FULL = 2;
- }
- SubscriptionMode subscription_mode = 4;
- bool incremental_global_orderly = 5;
+ repeated SchemaTable source_schema_tables = 2;
+ bool full = 3;
}
-message StartSubscriptionRequest {
- string database = 1;
- string subscription_name = 2;
+message AckStreamingRequestBody {
+ string ack_id = 1;
}
-message StopSubscriptionRequest {
- string database = 1;
- string subscription_name = 2;
+message StopStreamingRequestBody {
+ string streaming_id = 1;
}
-message DropSubscriptionRequest {
- string database = 1;
- string subscription_name = 2;
+message StartStreamingRequestBody {
+ string streaming_id = 1;
}
-message AckRequest {
- string ack_id = 3;
+message DropStreamingRequestBody {
+ string streaming_id = 1;
}
diff --git
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
index dd9a8158918..56b5969d9b8 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
@@ -33,10 +33,9 @@ message CDCResponse {
Status status = 2;
oneof response {
ServerGreetingResult server_greeting_result = 3;
- CreateSubscriptionResult create_subscription_result = 4;
+ StreamDataResult stream_data_result = 4;
DataRecordResult data_record_result = 5;
}
-
optional string error_code = 14;
optional string error_message = 15;
}
@@ -46,9 +45,8 @@ message ServerGreetingResult {
string protocol_version = 2;
}
-message CreateSubscriptionResult {
- string subscription_name = 1;
- bool existing = 2;
+message StreamDataResult {
+ string streaming_id = 1;
}
message NullValue {
diff --git
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index 1137e669816..8707da0276a 100644
---
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -22,29 +22,26 @@ import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
-import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.CreateSubscriptionJobParameter;
+import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
import
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
-import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
import
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
import
org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CreateSubscriptionResult;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
@@ -67,39 +64,38 @@ public final class CDCBackendHandler {
private final CDCJobAPI jobAPI = new CDCJobAPI();
/**
- * Create subscription.
+ * Stream data.
*
- * @param request CDC request
+ * @param requestId request id
+ * @param requestBody stream data request body
+ * @param connectionContext connection context
+ * @param channel channel
* @return CDC response
*/
- public CDCResponse createSubscription(final CDCRequest request) {
- CreateSubscriptionRequest createSubscription =
request.getCreateSubscription();
- ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(createSubscription.getDatabase());
+ public CDCResponse streamData(final String requestId, final
StreamDataRequestBody requestBody, final CDCConnectionContext
connectionContext, final Channel channel) {
+ ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
if (null == database) {
- return CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists",
createSubscription.getDatabase()));
+ return CDCResponseGenerator.failed(requestId,
CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists",
requestBody.getDatabase()));
}
- List<String> tableNames = new LinkedList<>();
- for (TableName each : createSubscription.getTableNamesList()) {
- tableNames.add(Strings.isNullOrEmpty(each.getSchema()) ?
each.getName() : String.join(".", each.getSchema(), each.getName()));
+ List<String> schemaTableNames = new LinkedList<>();
+ for (SchemaTable each : requestBody.getSourceSchemaTablesList()) {
+ schemaTableNames.add(Strings.isNullOrEmpty(each.getSchema()) ?
each.getTable() : String.join(".", each.getSchema(), each.getTable()));
}
Optional<ShardingRule> shardingRule =
database.getRuleMetaData().findSingleRule(ShardingRule.class);
if (!shardingRule.isPresent()) {
- return CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.SERVER_ERROR, "Not find sharding rule");
+ return CDCResponseGenerator.failed(requestId,
CDCResponseErrorCode.SERVER_ERROR, "Not find sharding rule");
}
Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
// TODO need support case-insensitive later
- for (TableName each : createSubscription.getTableNamesList()) {
- actualDataNodesMap.put(each.getName(),
getActualDataNodes(shardingRule.get(), each.getName()));
- }
- CreateSubscriptionJobParameter parameter = new
CreateSubscriptionJobParameter(createSubscription.getDatabase(), tableNames,
createSubscription.getSubscriptionName(),
- createSubscription.getSubscriptionMode().name(),
actualDataNodesMap, createSubscription.getIncrementalGlobalOrderly());
- if (jobAPI.createJob(parameter)) {
- return
CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(CreateSubscriptionResult.newBuilder()
-
.setSubscriptionName(createSubscription.getSubscriptionName()).setExisting(false).build()).build();
- } else {
- return
CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(CreateSubscriptionResult.newBuilder()
-
.setSubscriptionName(createSubscription.getSubscriptionName()).setExisting(true).build()).build();
+ for (SchemaTable each : requestBody.getSourceSchemaTablesList()) {
+ actualDataNodesMap.put(each.getTable(),
getActualDataNodes(shardingRule.get(), each.getTable()));
}
+ boolean decodeWithTx = database.getProtocolType() instanceof
OpenGaussDatabaseType;
+ StreamDataParameter parameter = new
StreamDataParameter(requestBody.getDatabase(), schemaTableNames,
requestBody.getFull(), actualDataNodesMap, decodeWithTx);
+ String jobId = jobAPI.createJob(parameter);
+ connectionContext.setJobId(jobId);
+ startStreaming(requestId, jobId, connectionContext, channel);
+ return
CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
}
private List<DataNode> getActualDataNodes(final ShardingRule shardingRule,
final String logicTableName) {
@@ -109,20 +105,29 @@ public final class CDCBackendHandler {
}
/**
- * Start subscription.
+ * Get database by job id.
*
- * @param request request
+ * @param jobId job id
+ * @return database
+ */
+ public String getDatabaseByJobId(final String jobId) {
+ return ((CDCJobConfiguration)
jobAPI.getJobConfiguration(jobId)).getDatabase();
+ }
+
+ /**
+ * Start streaming.
+ *
+ * @param requestId request id
+ * @param jobId job id
* @param channel channel
* @param connectionContext connection context
* @return CDC response
*/
- public CDCResponse startSubscription(final CDCRequest request, final
Channel channel, final CDCConnectionContext connectionContext) {
- StartSubscriptionRequest startSubscriptionRequest =
request.getStartSubscription();
- String jobId = jobAPI.marshalJobId(new
CDCJobId(startSubscriptionRequest.getDatabase(),
startSubscriptionRequest.getSubscriptionName()));
+ // TODO not return CDCResponse
+ public CDCResponse startStreaming(final String requestId, final String
jobId, final CDCConnectionContext connectionContext, final Channel channel) {
CDCJobConfiguration cdcJobConfig = (CDCJobConfiguration)
jobAPI.getJobConfiguration(jobId);
if (null == cdcJobConfig) {
- return CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config
doesn't exist",
- startSubscriptionRequest.getSubscriptionName()));
+ return CDCResponseGenerator.failed(jobId,
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config
doesn't exist", jobId));
}
JobConfigurationPOJO jobConfigPOJO =
PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
// TODO, ensure that there is only one consumer at a time, job config
disable may not be updated when the program is forced to close
@@ -132,22 +137,21 @@ public final class CDCBackendHandler {
Comparator<DataRecord> dataRecordComparator =
cdcJobConfig.isDecodeWithTX()
?
DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
: null;
- CDCJob job = new CDCJob(new CDCImporterConnector(channel,
cdcJobConfig.getDatabase(), cdcJobConfig.getJobShardingCount(),
cdcJobConfig.getTableNames(), dataRecordComparator));
+ CDCJob job = new CDCJob(new CDCImporterConnector(channel,
cdcJobConfig.getDatabase(), cdcJobConfig.getJobShardingCount(),
cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job,
jobConfigPOJO.toJobConfiguration());
job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
- connectionContext.setStatus(CDCConnectionStatus.SUBSCRIBED);
connectionContext.setJobId(jobId);
- return
CDCResponseGenerator.succeedBuilder(request.getRequestId()).build();
+ return CDCResponseGenerator.succeedBuilder(requestId).build();
}
/**
- * Stop subscription.
+ * Stop streaming.
*
* @param jobId job id
*/
- public void stopSubscription(final String jobId) {
+ public void stopStreaming(final String jobId) {
if (Strings.isNullOrEmpty(jobId)) {
log.warn("job id is null or empty, ignored");
return;
@@ -159,21 +163,21 @@ public final class CDCBackendHandler {
}
/**
- * Drop subscription.
+ * Drop streaming.
*
* @param jobId job id.
* @throws SQLException sql exception
*/
- public void dropSubscription(final String jobId) throws SQLException {
+ public void dropStreaming(final String jobId) throws SQLException {
jobAPI.rollback(jobId);
}
/**
* Process ack.
*
- * @param ackRequest ack request
+ * @param requestBody request body
*/
- public void processAck(final AckRequest ackRequest) {
- CDCAckHolder.getInstance().ack(ackRequest.getAckId());
+ public void processAck(final AckStreamingRequestBody requestBody) {
+ CDCAckHolder.getInstance().ack(requestBody.getAckId());
}
}
diff --git
a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
index c650d2f6ced..6bd48cb8efa 100644
---
a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
+++
b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
@@ -17,8 +17,11 @@
package org.apache.shardingsphere.proxy.backend.handler.cdc;
+import io.netty.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
@@ -35,7 +38,6 @@ import
org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.MockedStatic;
@@ -84,20 +86,9 @@ public final class CDCBackendHandlerTest {
}
@Test
- public void assertCreateSubscriptionFailed() {
- CDCRequest request =
CDCRequest.newBuilder().setRequestId("1").setCreateSubscription(CreateSubscriptionRequest.newBuilder().setDatabase("none")).build();
- CDCResponse actualResponse = handler.createSubscription(request);
+ public void assertStreamDataRequestFailed() {
+ CDCRequest request =
CDCRequest.newBuilder().setRequestId("1").setType(Type.STREAM_DATA).setStreamDataRequestBody(StreamDataRequestBody.newBuilder().setDatabase("none")).build();
+ CDCResponse actualResponse =
handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(),
mock(CDCConnectionContext.class), mock(Channel.class));
assertThat(actualResponse.getStatus(), is(Status.FAILED));
}
-
- // TODO ignore for now, it need more mock, since SPI is removed. It's
better to put it in E2E test
- @Ignore
- @Test
- public void assertCreateSubscriptionSucceed() {
- String requestId = "1";
- CDCRequest request =
CDCRequest.newBuilder().setRequestId(requestId).setCreateSubscription(CreateSubscriptionRequest.newBuilder().setDatabase("sharding_db")).build();
- CDCResponse actualResponse = handler.createSubscription(request);
- assertThat(actualResponse.getStatus(), is(Status.SUCCEED));
- assertThat(actualResponse.getRequestId(), is(requestId));
- }
}
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index e07a1a50a60..1c9a9859269 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -30,14 +30,13 @@ import
org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
import
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
import
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropSubscriptionRequest;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopSubscriptionRequest;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
import
org.apache.shardingsphere.distsql.handler.exception.rule.MissingRequiredRuleException;
@@ -80,7 +79,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
public void channelInactive(final ChannelHandlerContext ctx) {
CDCConnectionContext connectionContext =
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
if (null != connectionContext.getJobId()) {
- backendHandler.stopSubscription(connectionContext.getJobId());
+ backendHandler.stopStreaming(connectionContext.getJobId());
}
ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
}
@@ -112,33 +111,33 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
processLogin(ctx, request, connectionContext);
return;
}
- switch (request.getRequestCase()) {
- case CREATE_SUBSCRIPTION:
- processCreateSubscription(ctx, request, connectionContext);
+ switch (request.getType()) {
+ case STREAM_DATA:
+ processStreamDataRequest(ctx, request, connectionContext);
break;
- case START_SUBSCRIPTION:
- processStartSubscription(ctx, request, connectionContext);
+ case ACK_STREAMING:
+ processAckStreamingRequest(ctx, request);
break;
- case STOP_SUBSCRIPTION:
- processStopSubscription(ctx, request, connectionContext);
+ case STOP_STREAMING:
+ processStopStreamingRequest(ctx, request, connectionContext);
break;
- case DROP_SUBSCRIPTION:
- processDropSubscription(ctx, request, connectionContext);
+ case START_STREAMING:
+ processStartStreamingRequest(ctx, request, connectionContext);
break;
- case ACK_REQUEST:
- processAckRequest(ctx, request);
+ case DROP_STREAMING:
+ processDropStreamingRequest(ctx, request, connectionContext);
break;
default:
- log.warn("Cannot handle this type of request {}", request);
+ log.warn("can't handle this type of request {}", request);
}
}
private void processLogin(final ChannelHandlerContext ctx, final
CDCRequest request, final CDCConnectionContext connectionContext) {
- if (!request.hasLogin() || !request.getLogin().hasBasicBody()) {
+ if (!request.hasLoginRequestBody() ||
!request.getLoginRequestBody().hasBasicBody()) {
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss login request
body")).addListener(ChannelFutureListener.CLOSE);
return;
}
- BasicBody body = request.getLogin().getBasicBody();
+ BasicBody body = request.getLoginRequestBody().getBasicBody();
AuthorityRule authorityRule =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(AuthorityRule.class);
Optional<ShardingSphereUser> user = authorityRule.findUser(new
Grantee(body.getUsername(), getHostAddress(ctx)));
if (user.isPresent() &&
Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString().toUpperCase(),
body.getPassword())) {
@@ -164,69 +163,79 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
return socketAddress instanceof InetSocketAddress ?
((InetSocketAddress) socketAddress).getAddress().getHostAddress() :
socketAddress.toString();
}
- private void processCreateSubscription(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
- if (!request.hasCreateSubscription()) {
-
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss create subscription request
body"))
- .addListener(ChannelFutureListener.CLOSE);
+ private void processStreamDataRequest(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
+ if (!request.hasStreamDataRequestBody()) {
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss stream data request body"));
+ return;
+ }
+ StreamDataRequestBody requestBody = request.getStreamDataRequestBody();
+ if (requestBody.getDatabase().isEmpty()) {
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "database not allowed to be
empty"));
return;
}
- CreateSubscriptionRequest createSubscriptionRequest =
request.getCreateSubscription();
- if (createSubscriptionRequest.getTableNamesList().isEmpty() ||
createSubscriptionRequest.getDatabase().isEmpty() ||
createSubscriptionRequest.getSubscriptionName().isEmpty()
- || createSubscriptionRequest.getSubscriptionMode() ==
SubscriptionMode.UNKNOWN) {
-
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal create subscription
request parameter"));
+ // TODO need support the all tables at database or schema
+ if (requestBody.getSourceSchemaTablesList().isEmpty()) {
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal stream data request
parameter"));
return;
}
- checkPrivileges(connectionContext.getCurrentUser().getGrantee(),
createSubscriptionRequest.getDatabase());
- CDCResponse response = backendHandler.createSubscription(request);
+ checkPrivileges(connectionContext.getCurrentUser().getGrantee(),
requestBody.getDatabase());
+ CDCResponse response =
backendHandler.streamData(request.getRequestId(), requestBody,
connectionContext, ctx.channel());
ctx.writeAndFlush(response);
}
- private void processStartSubscription(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
- if (!request.hasStartSubscription()) {
-
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss start subscription request
body"))
+ private void processAckStreamingRequest(final ChannelHandlerContext ctx,
final CDCRequest request) {
+ if (!request.hasAckStreamingRequestBody()) {
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss ack request
body")).addListener(ChannelFutureListener.CLOSE);
+ return;
+ }
+ AckStreamingRequestBody requestBody =
request.getAckStreamingRequestBody();
+ if (requestBody.getAckId().isEmpty()) {
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal ack request parameter"));
+ return;
+ }
+ backendHandler.processAck(requestBody);
+ }
+
+ private void processStartStreamingRequest(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
+ if (!request.hasStartStreamingRequestBody()) {
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss start streaming request
body"))
.addListener(ChannelFutureListener.CLOSE);
return;
}
- StartSubscriptionRequest startSubscriptionRequest =
request.getStartSubscription();
- if (startSubscriptionRequest.getDatabase().isEmpty() ||
startSubscriptionRequest.getSubscriptionName().isEmpty()) {
-
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal start subscription request
parameter"))
+ StartStreamingRequestBody requestBody =
request.getStartStreamingRequestBody();
+ // TODO improve after cdc exception refactor
+ if (requestBody.getStreamingId().isEmpty()) {
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal start streaming request
parameter"))
.addListener(ChannelFutureListener.CLOSE);
return;
}
- checkPrivileges(connectionContext.getCurrentUser().getGrantee(),
startSubscriptionRequest.getDatabase());
- CDCResponse response = backendHandler.startSubscription(request,
ctx.channel(), connectionContext);
+ String database =
backendHandler.getDatabaseByJobId(requestBody.getStreamingId());
+ checkPrivileges(connectionContext.getCurrentUser().getGrantee(),
database);
+ CDCResponse response =
backendHandler.startStreaming(request.getRequestId(),
requestBody.getStreamingId(), connectionContext, ctx.channel());
ctx.writeAndFlush(response);
}
- private void processStopSubscription(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
- StopSubscriptionRequest stopSubscriptionRequest =
request.getStopSubscription();
- checkPrivileges(connectionContext.getCurrentUser().getGrantee(),
stopSubscriptionRequest.getDatabase());
- backendHandler.stopSubscription(connectionContext.getJobId());
+ private void processStopStreamingRequest(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
+ StopStreamingRequestBody requestBody =
request.getStopStreamingRequestBody();
+ String database =
backendHandler.getDatabaseByJobId(requestBody.getStreamingId());
+ checkPrivileges(connectionContext.getCurrentUser().getGrantee(),
database);
+ backendHandler.stopStreaming(connectionContext.getJobId());
connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
+ connectionContext.setJobId(null);
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
}
- private void processDropSubscription(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
- DropSubscriptionRequest dropSubscriptionRequest =
request.getDropSubscription();
- checkPrivileges(connectionContext.getCurrentUser().getGrantee(),
dropSubscriptionRequest.getDatabase());
+ private void processDropStreamingRequest(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
+ DropStreamingRequestBody requestBody =
request.getDropStreamingRequestBody();
+ String database =
backendHandler.getDatabaseByJobId(requestBody.getStreamingId());
+ checkPrivileges(connectionContext.getCurrentUser().getGrantee(),
database);
try {
- backendHandler.dropSubscription(connectionContext.getJobId());
+ backendHandler.dropStreaming(connectionContext.getJobId());
+ connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
+ connectionContext.setJobId(null);
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
} catch (final SQLException ex) {
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.SERVER_ERROR, ex.getMessage()));
}
}
-
- private void processAckRequest(final ChannelHandlerContext ctx, final
CDCRequest request) {
- if (!request.hasAckRequest()) {
-
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss ack request
body")).addListener(ChannelFutureListener.CLOSE);
- return;
- }
- AckRequest ackRequest = request.getAckRequest();
- if (ackRequest.getAckId().isEmpty()) {
-
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal ack request parameter"));
- return;
- }
- backendHandler.processAck(ackRequest);
- }
}
diff --git
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
index 3942246d752..603f4f0468c 100644
---
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
+++
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
@@ -24,8 +24,9 @@ import org.apache.shardingsphere.authority.rule.AuthorityRule;
import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
import
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
@@ -82,7 +83,8 @@ public final class CDCChannelInboundHandlerTest {
@Test
public void assertLoginRequestFailed() {
- CDCRequest actualRequest =
CDCRequest.newBuilder().setLogin(LoginRequest.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root2").build()).build()).build();
+ CDCRequest actualRequest =
CDCRequest.newBuilder().setType(Type.LOGIN).setLoginRequestBody(LoginRequestBody.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root2").build())
+ .build()).build();
channel.writeInbound(actualRequest);
CDCResponse expectedGreetingResult = channel.readOutbound();
assertTrue(expectedGreetingResult.hasServerGreetingResult());
@@ -94,7 +96,7 @@ public final class CDCChannelInboundHandlerTest {
@Test
public void assertIllegalLoginRequest() {
- CDCRequest actualRequest =
CDCRequest.newBuilder().setVersion(1).setRequestId("test").build();
+ CDCRequest actualRequest =
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId("test").build();
channel.writeInbound(actualRequest);
CDCResponse expectedGreetingResult = channel.readOutbound();
assertTrue(expectedGreetingResult.hasServerGreetingResult());
@@ -107,7 +109,8 @@ public final class CDCChannelInboundHandlerTest {
@Test
public void assertLoginRequestSucceed() {
String encryptPassword =
Hashing.sha256().hashBytes("root".getBytes()).toString().toUpperCase();
- Builder builder =
CDCRequest.newBuilder().setLogin(LoginRequest.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root").setPassword(encryptPassword).build()).build());
+ Builder builder =
CDCRequest.newBuilder().setType(Type.LOGIN).setLoginRequestBody(LoginRequestBody.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root")
+ .setPassword(encryptPassword).build()).build());
CDCRequest actualRequest = builder.build();
channel.writeInbound(actualRequest);
CDCResponse expectedGreetingResult = channel.readOutbound();