This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c3e546f0f69 Refactor CDCClient (#37372)
c3e546f0f69 is described below
commit c3e546f0f6983bb1bcdfff1d87ba1aaef8f05906
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 13 13:22:55 2025 +0800
Refactor CDCClient (#37372)
* Refactor CDCClient
* Refactor CDCClient
---
.../data/pipeline/cdc/client/CDCClient.java | 45 +++++++---------------
.../cdc/client/config/CDCClientConfiguration.java | 11 +++++-
2 files changed, 22 insertions(+), 34 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index 163a4de8fc2..b18fce5aa22 100644
---
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.client;
+import com.google.common.base.Preconditions;
import com.google.common.hash.Hashing;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
@@ -29,6 +30,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
@@ -59,6 +61,7 @@ import java.util.function.Consumer;
/**
* CDC client.
*/
+@RequiredArgsConstructor
@Slf4j
public final class CDCClient implements AutoCloseable {
@@ -68,20 +71,6 @@ public final class CDCClient implements AutoCloseable {
private Channel channel;
- public CDCClient(final CDCClientConfiguration config) {
- validateParameter(config);
- this.config = config;
- }
-
- private void validateParameter(final CDCClientConfiguration parameter) {
- if (null == parameter.getAddress() ||
parameter.getAddress().isEmpty()) {
- throw new IllegalArgumentException("The address parameter can't be
null");
- }
- if (parameter.getPort() <= 0) {
- throw new IllegalArgumentException("The port must be greater than
0");
- }
- }
-
/**
* Connect.
*
@@ -127,13 +116,11 @@ public final class CDCClient implements AutoCloseable {
* @throws IllegalStateException the channel is not active
*/
public synchronized void login(final CDCLoginParameter parameter) {
- checkChannelActive();
+ Preconditions.checkState(null != channel && channel.isActive(), "The
channel is not active, call the `connect` method first.");
ClientConnectionContext connectionContext =
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
- if (ClientConnectionStatus.LOGGED_IN ==
connectionContext.getStatus().get()) {
- throw new IllegalStateException("The client is already logged in");
- }
- LoginRequestBody loginRequestBody =
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(parameter.getUsername())
-
.setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
+ Preconditions.checkState(ClientConnectionStatus.LOGGED_IN !=
connectionContext.getStatus().get(), "The client is already logged in.");
+ LoginRequestBody loginRequestBody =
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder()
+
.setUsername(parameter.getUsername()).setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
String requestId = RequestIdUtils.generateRequestId();
CDCRequest data =
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(requestId).setLoginRequestBody(loginRequestBody).build();
ResponseFuture responseFuture = new ResponseFuture(requestId,
Type.LOGIN);
@@ -143,21 +130,15 @@ public final class CDCClient implements AutoCloseable {
log.info("Login success, username: {}", parameter.getUsername());
}
- private void checkChannelActive() {
- if (null == channel || !channel.isActive()) {
- throw new IllegalStateException("The channel is not active, call
the `connect` method first");
- }
- }
-
/**
* Start streaming.
*
* @param parameter parameter
- * @return streaming id
+ * @return streaming ID
*/
public String startStreaming(final StartStreamingParameter parameter) {
- StreamDataRequestBody streamDataRequestBody =
StreamDataRequestBody.newBuilder().setDatabase(parameter.getDatabase()).setFull(parameter.isFull())
- .addAllSourceSchemaTable(parameter.getSchemaTables()).build();
+ StreamDataRequestBody streamDataRequestBody =
StreamDataRequestBody.newBuilder()
+
.setDatabase(parameter.getDatabase()).setFull(parameter.isFull()).addAllSourceSchemaTable(parameter.getSchemaTables()).build();
String requestId = RequestIdUtils.generateRequestId();
CDCRequest request =
CDCRequest.newBuilder().setRequestId(requestId).setType(Type.STREAM_DATA).setStreamDataRequestBody(streamDataRequestBody).build();
ClientConnectionContext connectionContext =
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
@@ -172,7 +153,7 @@ public final class CDCClient implements AutoCloseable {
/**
* Restart streaming.
*
- * @param streamingId streaming id
+ * @param streamingId streaming ID
*/
public void restartStreaming(final String streamingId) {
String requestId = RequestIdUtils.generateRequestId();
@@ -189,7 +170,7 @@ public final class CDCClient implements AutoCloseable {
/**
* Stop streaming.
*
- * @param streamingId streaming id
+ * @param streamingId streaming ID
*/
public void stopStreaming(final String streamingId) {
String requestId = RequestIdUtils.generateRequestId();
@@ -207,7 +188,7 @@ public final class CDCClient implements AutoCloseable {
/**
* Drop streaming.
*
- * @param streamingId streaming id
+ * @param streamingId streaming ID
*/
public void dropStreaming(final String streamingId) {
String requestId = RequestIdUtils.generateRequestId();
diff --git
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java
index 443dbc68794..e536b7864d5 100644
---
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java
+++
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java
@@ -17,13 +17,12 @@
package org.apache.shardingsphere.data.pipeline.cdc.client.config;
+import com.google.common.base.Preconditions;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
/**
* CDC client configuration.
*/
-@RequiredArgsConstructor
@Getter
public final class CDCClientConfiguration {
@@ -32,4 +31,12 @@ public final class CDCClientConfiguration {
private final int port;
private final int timeoutMillis;
+
+ public CDCClientConfiguration(final String address, final int port, final
int timeoutMillis) {
+ Preconditions.checkArgument(null != address && !address.isEmpty(),
"The address parameter can't be null.");
+ Preconditions.checkArgument(port > 0, "The port must be greater than
0.");
+ this.address = address;
+ this.port = port;
+ this.timeoutMillis = timeoutMillis;
+ }
}