This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c3e546f0f69 Refactor CDCClient (#37372)
c3e546f0f69 is described below

commit c3e546f0f6983bb1bcdfff1d87ba1aaef8f05906
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 13 13:22:55 2025 +0800

    Refactor CDCClient (#37372)
    
    * Refactor CDCClient
    
    * Refactor CDCClient
---
 .../data/pipeline/cdc/client/CDCClient.java        | 45 +++++++---------------
 .../cdc/client/config/CDCClientConfiguration.java  | 11 +++++-
 2 files changed, 22 insertions(+), 34 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index 163a4de8fc2..b18fce5aa22 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.client;
 
+import com.google.common.base.Preconditions;
 import com.google.common.hash.Hashing;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -29,6 +30,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
 import io.netty.handler.codec.protobuf.ProtobufEncoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
@@ -59,6 +61,7 @@ import java.util.function.Consumer;
 /**
  * CDC client.
  */
+@RequiredArgsConstructor
 @Slf4j
 public final class CDCClient implements AutoCloseable {
     
@@ -68,20 +71,6 @@ public final class CDCClient implements AutoCloseable {
     
     private Channel channel;
     
-    public CDCClient(final CDCClientConfiguration config) {
-        validateParameter(config);
-        this.config = config;
-    }
-    
-    private void validateParameter(final CDCClientConfiguration parameter) {
-        if (null == parameter.getAddress() || 
parameter.getAddress().isEmpty()) {
-            throw new IllegalArgumentException("The address parameter can't be 
null");
-        }
-        if (parameter.getPort() <= 0) {
-            throw new IllegalArgumentException("The port must be greater than 
0");
-        }
-    }
-    
     /**
      * Connect.
      *
@@ -127,13 +116,11 @@ public final class CDCClient implements AutoCloseable {
      * @throws IllegalStateException the channel is not active
      */
     public synchronized void login(final CDCLoginParameter parameter) {
-        checkChannelActive();
+        Preconditions.checkState(null != channel && channel.isActive(), "The 
channel is not active, call the `connect` method first.");
         ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
-        if (ClientConnectionStatus.LOGGED_IN == 
connectionContext.getStatus().get()) {
-            throw new IllegalStateException("The client is already logged in");
-        }
-        LoginRequestBody loginRequestBody = 
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(parameter.getUsername())
-                
.setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
+        Preconditions.checkState(ClientConnectionStatus.LOGGED_IN != 
connectionContext.getStatus().get(), "The client is already logged in.");
+        LoginRequestBody loginRequestBody = 
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder()
+                
.setUsername(parameter.getUsername()).setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
         String requestId = RequestIdUtils.generateRequestId();
         CDCRequest data = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(requestId).setLoginRequestBody(loginRequestBody).build();
         ResponseFuture responseFuture = new ResponseFuture(requestId, 
Type.LOGIN);
@@ -143,21 +130,15 @@ public final class CDCClient implements AutoCloseable {
         log.info("Login success, username: {}", parameter.getUsername());
     }
     
-    private void checkChannelActive() {
-        if (null == channel || !channel.isActive()) {
-            throw new IllegalStateException("The channel is not active, call 
the `connect` method first");
-        }
-    }
-    
     /**
      * Start streaming.
      *
      * @param parameter parameter
-     * @return streaming id
+     * @return streaming ID
      */
     public String startStreaming(final StartStreamingParameter parameter) {
-        StreamDataRequestBody streamDataRequestBody = 
StreamDataRequestBody.newBuilder().setDatabase(parameter.getDatabase()).setFull(parameter.isFull())
-                .addAllSourceSchemaTable(parameter.getSchemaTables()).build();
+        StreamDataRequestBody streamDataRequestBody = 
StreamDataRequestBody.newBuilder()
+                
.setDatabase(parameter.getDatabase()).setFull(parameter.isFull()).addAllSourceSchemaTable(parameter.getSchemaTables()).build();
         String requestId = RequestIdUtils.generateRequestId();
         CDCRequest request = 
CDCRequest.newBuilder().setRequestId(requestId).setType(Type.STREAM_DATA).setStreamDataRequestBody(streamDataRequestBody).build();
         ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
@@ -172,7 +153,7 @@ public final class CDCClient implements AutoCloseable {
     /**
      * Restart streaming.
      *
-     * @param streamingId streaming id
+     * @param streamingId streaming ID
      */
     public void restartStreaming(final String streamingId) {
         String requestId = RequestIdUtils.generateRequestId();
@@ -189,7 +170,7 @@ public final class CDCClient implements AutoCloseable {
     /**
      * Stop streaming.
      *
-     * @param streamingId streaming id
+     * @param streamingId streaming ID
      */
     public void stopStreaming(final String streamingId) {
         String requestId = RequestIdUtils.generateRequestId();
@@ -207,7 +188,7 @@ public final class CDCClient implements AutoCloseable {
     /**
      * Drop streaming.
      *
-     * @param streamingId streaming id
+     * @param streamingId streaming ID
      */
     public void dropStreaming(final String streamingId) {
         String requestId = RequestIdUtils.generateRequestId();
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java
index 443dbc68794..e536b7864d5 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java
@@ -17,13 +17,12 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.client.config;
 
+import com.google.common.base.Preconditions;
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 
 /**
  * CDC client configuration.
  */
-@RequiredArgsConstructor
 @Getter
 public final class CDCClientConfiguration {
     
@@ -32,4 +31,12 @@ public final class CDCClientConfiguration {
     private final int port;
     
     private final int timeoutMillis;
+    
+    public CDCClientConfiguration(final String address, final int port, final 
int timeoutMillis) {
+        Preconditions.checkArgument(null != address && !address.isEmpty(), 
"The address parameter can't be null.");
+        Preconditions.checkArgument(port > 0, "The port must be greater than 
0.");
+        this.address = address;
+        this.port = port;
+        this.timeoutMillis = timeoutMillis;
+    }
 }

Reply via email to