sandynz commented on code in PR #27542:
URL: https://github.com/apache/shardingsphere/pull/27542#discussion_r1280044222


##########
kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java:
##########
@@ -32,19 +35,17 @@ public final class Bootstrap {
      *
      * @param args args
      */
+    @SneakyThrows(InterruptedException.class)
     public static void main(final String[] args) {
         // Pay attention to the time zone, to avoid the problem of incorrect 
time zone, it is best to ensure that the time zone of the program is consistent 
with the time zone of the database server
-        // and mysql-connector-java 5.x version will ignore serverTimezone 
jdbc parameter and use the default time zone in the program
         // TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
-        StartCDCClientParameter parameter = new StartCDCClientParameter();
-        parameter.setAddress("127.0.0.1");
-        parameter.setPort(33071);
-        parameter.setUsername("root");
-        parameter.setPassword("root");
-        parameter.setDatabase("sharding_db");
-        parameter.setFull(true);
-        
parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build()));
-        CDCClient cdcClient = new CDCClient(parameter, records -> 
log.info("records: {}", records));
-        cdcClient.start();
+        String address = "127.0.0.1";
+        CDCClientConfiguration clientConfig = new 
CDCClientConfiguration(address, 33071, records -> log.info("records: {}", 
records));

Review Comment:
   When there's exception thrown on streaming, it should notify users, and then 
users could customize how to handle it, e.g. re-connect / re-subscribe / stop.



##########
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java:
##########
@@ -93,14 +82,174 @@ protected void initChannel(final NioSocketChannel channel) 
{
                         channel.pipeline().addLast(new 
ProtobufDecoder(CDCResponse.getDefaultInstance()));
                         channel.pipeline().addLast(new 
ProtobufVarint32LengthFieldPrepender());
                         channel.pipeline().addLast(new ProtobufEncoder());
-                        channel.pipeline().addLast(new 
LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
-                        channel.pipeline().addLast(new 
CDCRequestHandler(parameter, consumer));
+                        
channel.pipeline().addLast(CDCLoginRequestHandler.class.getSimpleName(), new 
CDCLoginRequestHandler());
+                        channel.pipeline().addLast(new 
CDCRequestHandler(config.getConsumer()));
                     }
                 });
+        channel = bootstrap.connect(config.getAddress(), 
config.getPort()).sync().channel();
+    }
+    
+    private void validateParameter(final CDCClientConfiguration parameter) {
+        if (null == parameter.getAddress() || 
parameter.getAddress().isEmpty()) {
+            throw new IllegalArgumentException("The address parameter can't be 
null");
+        }
+        if (parameter.getPort() <= 0) {
+            throw new IllegalArgumentException("The port must be greater than 
0");
+        }
+    }
+    
+    /**
+     * Check channel is active.
+     *
+     * @return true if channel is active
+     */
+    public boolean isActive() {
+        return channel.isActive();
+    }
+    
+    /**
+     * Await channel close.
+     *
+     * @throws InterruptedException interrupted exception
+     */
+    public void await() throws InterruptedException {
+        channel.closeFuture().sync();
+    }
+    
+    /**
+     * Login.
+     *
+     * @param parameter parameter
+     * @throws IllegalStateException     the channel is not active
+     * @throws IllegalArgumentException  the user is illegal
+     * @throws GetResultTimeoutException get result timeout
+     */
+    public synchronized void login(final CDCLoginParameter parameter) {
+        if (null == channel || !channel.isActive()) {
+            throw new IllegalStateException("The channel is not active");
+        }
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        if (ClientConnectionStatus.LOGGED_IN == 
connectionContext.getStatus().get()) {
+            throw new IllegalStateException("The client is already logged in");
+        }
+        LoginRequestBody loginRequestBody = 
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(parameter.getUsername())
+                
.setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
+        String requestId = RequestIdUtils.generateRequestId();
+        CDCRequest data = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(requestId).setLoginRequestBody(loginRequestBody).build();
+        ResponseFuture responseFuture = new ResponseFuture();
+        connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
+        channel.writeAndFlush(data);
         try {
-            ChannelFuture future = bootstrap.connect(address, port).sync();
-            future.channel().closeFuture().sync();
+            getResultFromResponseFuture(responseFuture);
+            log.info("Login success, username: {}", parameter.getUsername());
         } finally {
+            connectionContext.getResponseFutureMap().remove(requestId);
+        }
+    }
+    
+    private Object getResultFromResponseFuture(final ResponseFuture 
responseFuture) {
+        boolean receivedResult = 
responseFuture.waitResponse(config.getTimeoutMills());
+        if (!receivedResult) {
+            throw new GetResultTimeoutException("Get result timeout");
+        }
+        if (!Strings.isNullOrEmpty(responseFuture.getErrorMessage())) {
+            throw new IllegalArgumentException(String.format("Get Response 
failed, reason: %s", responseFuture.getErrorMessage()));
+        }
+        return responseFuture.getResult();
+    }
+    
+    /**
+     * Start streaming.
+     *
+     * @param parameter parameter
+     * @return streaming id
+     * @throws IllegalStateException     start streaming failed

Review Comment:
   Where is `IllegalStateException` thrown



##########
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java:
##########
@@ -93,14 +82,174 @@ protected void initChannel(final NioSocketChannel channel) 
{
                         channel.pipeline().addLast(new 
ProtobufDecoder(CDCResponse.getDefaultInstance()));
                         channel.pipeline().addLast(new 
ProtobufVarint32LengthFieldPrepender());
                         channel.pipeline().addLast(new ProtobufEncoder());
-                        channel.pipeline().addLast(new 
LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
-                        channel.pipeline().addLast(new 
CDCRequestHandler(parameter, consumer));
+                        
channel.pipeline().addLast(CDCLoginRequestHandler.class.getSimpleName(), new 
CDCLoginRequestHandler());
+                        channel.pipeline().addLast(new 
CDCRequestHandler(config.getConsumer()));
                     }
                 });
+        channel = bootstrap.connect(config.getAddress(), 
config.getPort()).sync().channel();
+    }
+    
+    private void validateParameter(final CDCClientConfiguration parameter) {
+        if (null == parameter.getAddress() || 
parameter.getAddress().isEmpty()) {
+            throw new IllegalArgumentException("The address parameter can't be 
null");
+        }
+        if (parameter.getPort() <= 0) {
+            throw new IllegalArgumentException("The port must be greater than 
0");
+        }
+    }
+    
+    /**
+     * Check channel is active.
+     *
+     * @return true if channel is active
+     */
+    public boolean isActive() {
+        return channel.isActive();
+    }
+    
+    /**
+     * Await channel close.
+     *
+     * @throws InterruptedException interrupted exception
+     */
+    public void await() throws InterruptedException {
+        channel.closeFuture().sync();
+    }
+    
+    /**
+     * Login.
+     *
+     * @param parameter parameter
+     * @throws IllegalStateException     the channel is not active
+     * @throws IllegalArgumentException  the user is illegal
+     * @throws GetResultTimeoutException get result timeout
+     */
+    public synchronized void login(final CDCLoginParameter parameter) {
+        if (null == channel || !channel.isActive()) {
+            throw new IllegalStateException("The channel is not active");
+        }
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        if (ClientConnectionStatus.LOGGED_IN == 
connectionContext.getStatus().get()) {
+            throw new IllegalStateException("The client is already logged in");
+        }
+        LoginRequestBody loginRequestBody = 
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(parameter.getUsername())
+                
.setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
+        String requestId = RequestIdUtils.generateRequestId();
+        CDCRequest data = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(requestId).setLoginRequestBody(loginRequestBody).build();
+        ResponseFuture responseFuture = new ResponseFuture();
+        connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
+        channel.writeAndFlush(data);
         try {
-            ChannelFuture future = bootstrap.connect(address, port).sync();
-            future.channel().closeFuture().sync();
+            getResultFromResponseFuture(responseFuture);
+            log.info("Login success, username: {}", parameter.getUsername());
         } finally {
+            connectionContext.getResponseFutureMap().remove(requestId);
+        }
+    }
+    
+    private Object getResultFromResponseFuture(final ResponseFuture 
responseFuture) {
+        boolean receivedResult = 
responseFuture.waitResponse(config.getTimeoutMills());
+        if (!receivedResult) {
+            throw new GetResultTimeoutException("Get result timeout");
+        }
+        if (!Strings.isNullOrEmpty(responseFuture.getErrorMessage())) {
+            throw new IllegalArgumentException(String.format("Get Response 
failed, reason: %s", responseFuture.getErrorMessage()));
+        }
+        return responseFuture.getResult();
+    }
+    
+    /**
+     * Start streaming.
+     *
+     * @param parameter parameter
+     * @return streaming id
+     * @throws IllegalStateException     start streaming failed
+     * @throws GetResultTimeoutException get result timeout
+     */
+    public String startStreaming(final StartStreamingParameter parameter) {
+        StreamDataRequestBody streamDataRequestBody = 
StreamDataRequestBody.newBuilder().setDatabase(parameter.getDatabase()).setFull(parameter.isFull())
+                .addAllSourceSchemaTable(parameter.getSchemaTables()).build();
+        String requestId = RequestIdUtils.generateRequestId();
+        CDCRequest request = 
CDCRequest.newBuilder().setRequestId(requestId).setType(Type.STREAM_DATA).setStreamDataRequestBody(streamDataRequestBody).build();
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        ResponseFuture responseFuture = new ResponseFuture();
+        connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
+        channel.writeAndFlush(request);
+        try {
+            String streamingId = 
getResultFromResponseFuture(responseFuture).toString();
+            log.info("Start streaming success, streaming id: {}", streamingId);
+            return streamingId;

Review Comment:
   `streamingId` should be `result`



##########
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.config;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * Start CDC client parameter.

Review Comment:
   Javadoc doesn't match class



##########
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java:
##########
@@ -93,14 +82,174 @@ protected void initChannel(final NioSocketChannel channel) 
{
                         channel.pipeline().addLast(new 
ProtobufDecoder(CDCResponse.getDefaultInstance()));
                         channel.pipeline().addLast(new 
ProtobufVarint32LengthFieldPrepender());
                         channel.pipeline().addLast(new ProtobufEncoder());
-                        channel.pipeline().addLast(new 
LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
-                        channel.pipeline().addLast(new 
CDCRequestHandler(parameter, consumer));
+                        
channel.pipeline().addLast(CDCLoginRequestHandler.class.getSimpleName(), new 
CDCLoginRequestHandler());
+                        channel.pipeline().addLast(new 
CDCRequestHandler(config.getConsumer()));
                     }
                 });
+        channel = bootstrap.connect(config.getAddress(), 
config.getPort()).sync().channel();
+    }
+    
+    private void validateParameter(final CDCClientConfiguration parameter) {
+        if (null == parameter.getAddress() || 
parameter.getAddress().isEmpty()) {
+            throw new IllegalArgumentException("The address parameter can't be 
null");
+        }
+        if (parameter.getPort() <= 0) {
+            throw new IllegalArgumentException("The port must be greater than 
0");
+        }
+    }
+    
+    /**
+     * Check channel is active.
+     *
+     * @return true if channel is active
+     */
+    public boolean isActive() {
+        return channel.isActive();
+    }
+    
+    /**
+     * Await channel close.
+     *
+     * @throws InterruptedException interrupted exception
+     */
+    public void await() throws InterruptedException {
+        channel.closeFuture().sync();
+    }
+    
+    /**
+     * Login.
+     *
+     * @param parameter parameter
+     * @throws IllegalStateException     the channel is not active
+     * @throws IllegalArgumentException  the user is illegal
+     * @throws GetResultTimeoutException get result timeout
+     */
+    public synchronized void login(final CDCLoginParameter parameter) {
+        if (null == channel || !channel.isActive()) {
+            throw new IllegalStateException("The channel is not active");
+        }
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        if (ClientConnectionStatus.LOGGED_IN == 
connectionContext.getStatus().get()) {
+            throw new IllegalStateException("The client is already logged in");
+        }
+        LoginRequestBody loginRequestBody = 
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(parameter.getUsername())
+                
.setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
+        String requestId = RequestIdUtils.generateRequestId();
+        CDCRequest data = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(requestId).setLoginRequestBody(loginRequestBody).build();
+        ResponseFuture responseFuture = new ResponseFuture();
+        connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
+        channel.writeAndFlush(data);
         try {
-            ChannelFuture future = bootstrap.connect(address, port).sync();
-            future.channel().closeFuture().sync();
+            getResultFromResponseFuture(responseFuture);
+            log.info("Login success, username: {}", parameter.getUsername());
         } finally {
+            connectionContext.getResponseFutureMap().remove(requestId);
+        }
+    }
+    
+    private Object getResultFromResponseFuture(final ResponseFuture 
responseFuture) {
+        boolean receivedResult = 
responseFuture.waitResponse(config.getTimeoutMills());
+        if (!receivedResult) {
+            throw new GetResultTimeoutException("Get result timeout");
+        }
+        if (!Strings.isNullOrEmpty(responseFuture.getErrorMessage())) {
+            throw new IllegalArgumentException(String.format("Get Response 
failed, reason: %s", responseFuture.getErrorMessage()));
+        }
+        return responseFuture.getResult();
+    }
+    
+    /**
+     * Start streaming.
+     *
+     * @param parameter parameter
+     * @return streaming id
+     * @throws IllegalStateException     start streaming failed
+     * @throws GetResultTimeoutException get result timeout
+     */
+    public String startStreaming(final StartStreamingParameter parameter) {
+        StreamDataRequestBody streamDataRequestBody = 
StreamDataRequestBody.newBuilder().setDatabase(parameter.getDatabase()).setFull(parameter.isFull())
+                .addAllSourceSchemaTable(parameter.getSchemaTables()).build();
+        String requestId = RequestIdUtils.generateRequestId();
+        CDCRequest request = 
CDCRequest.newBuilder().setRequestId(requestId).setType(Type.STREAM_DATA).setStreamDataRequestBody(streamDataRequestBody).build();
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        ResponseFuture responseFuture = new ResponseFuture();
+        connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
+        channel.writeAndFlush(request);
+        try {
+            String streamingId = 
getResultFromResponseFuture(responseFuture).toString();
+            log.info("Start streaming success, streaming id: {}", streamingId);
+            return streamingId;
+        } finally {
+            connectionContext.getResponseFutureMap().remove(requestId);
+        }
+    }
+    
+    /**
+     * Restart streaming.
+     *
+     * @param streamingId streaming id
+     * @throws IllegalStateException the channel is not active
+     */
+    public void restartStreaming(final String streamingId) {
+        if (checkStreamingIdExist(streamingId)) {
+            stopStreaming(streamingId);
+        }
+        String requestId = RequestIdUtils.generateRequestId();
+        StartStreamingRequestBody body = 
StartStreamingRequestBody.newBuilder().setStreamingId(streamingId).build();
+        CDCRequest request = 
CDCRequest.newBuilder().setRequestId(requestId).setType(Type.START_STREAMING).setStartStreamingRequestBody(body).build();
+        ResponseFuture responseFuture = new ResponseFuture();
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
+        channel.writeAndFlush(request);
+        try {
+            getResultFromResponseFuture(responseFuture);
+            log.info("Restart streaming success, streaming id: {}", 
streamingId);
+        } finally {
+            connectionContext.getResponseFutureMap().remove(requestId);
+        }

Review Comment:
   Could we merge finally block code into `getResultFromResponseFuture`? Make 
sure it could be done automatically.



##########
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ResponseFuture.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.util;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.SneakyThrows;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Response future.
+ */
+@RequiredArgsConstructor
+@Getter
+@Setter
+public final class ResponseFuture {
+    
+    private final CountDownLatch countDownLatch = new CountDownLatch(1);

Review Comment:
   `countDownLatch` should not be exposed, could we move the `countDown` 
invocations in `ResponseFuture`?



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java:
##########
@@ -132,9 +133,13 @@ public void channelRead(final ChannelHandlerContext ctx, 
final Object msg) {
     
     private void processLogin(final ChannelHandlerContext ctx, final 
CDCRequest request) {
         if (!request.hasLoginRequestBody() || 
!request.getLoginRequestBody().hasBasicBody()) {
-            throw new CDCExceptionWrapper(request.getRequestId(), new 
PipelineInvalidParameterException("Login request body is empty"));
+            throw new CDCExceptionWrapper(request.getRequestId(), new 
CDCLoginException("Login request body is empty"));
         }
         BasicBody body = request.getLoginRequestBody().getBasicBody();
+        CDCConnectionContext connectionContext = 
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
+        if (null != connectionContext && 
Objects.equals(connectionContext.getCurrentUser().getGrantee().getUsername(), 
body.getUsername())) {
+            throw new CDCExceptionWrapper(request.getRequestId(), new 
CDCLoginException(String.format("%s already logged in", body.getUsername())));
+        }

Review Comment:
   If it has already logged, could we not throw exception and verify again?



##########
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartStreamingParameter.java:
##########
@@ -17,32 +17,21 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
 
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import lombok.Setter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
 
-import java.util.List;
+import java.util.Set;
 
-/**
- * Start CDC client parameter.
- */
-@Getter
-@Setter
 @RequiredArgsConstructor
-public final class StartCDCClientParameter {
-    
-    private String address;
-    
-    private int port;
-    
-    private String username;
-    
-    private String password;
+@Getter
+@EqualsAndHashCode

Review Comment:
   Is `EqualsAndHashCode` annotation required?



##########
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java:
##########
@@ -93,14 +82,174 @@ protected void initChannel(final NioSocketChannel channel) 
{
                         channel.pipeline().addLast(new 
ProtobufDecoder(CDCResponse.getDefaultInstance()));
                         channel.pipeline().addLast(new 
ProtobufVarint32LengthFieldPrepender());
                         channel.pipeline().addLast(new ProtobufEncoder());
-                        channel.pipeline().addLast(new 
LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
-                        channel.pipeline().addLast(new 
CDCRequestHandler(parameter, consumer));
+                        
channel.pipeline().addLast(CDCLoginRequestHandler.class.getSimpleName(), new 
CDCLoginRequestHandler());
+                        channel.pipeline().addLast(new 
CDCRequestHandler(config.getConsumer()));
                     }
                 });
+        channel = bootstrap.connect(config.getAddress(), 
config.getPort()).sync().channel();
+    }
+    
+    private void validateParameter(final CDCClientConfiguration parameter) {
+        if (null == parameter.getAddress() || 
parameter.getAddress().isEmpty()) {
+            throw new IllegalArgumentException("The address parameter can't be 
null");
+        }
+        if (parameter.getPort() <= 0) {
+            throw new IllegalArgumentException("The port must be greater than 
0");
+        }
+    }
+    
+    /**
+     * Check channel is active.
+     *
+     * @return true if channel is active
+     */
+    public boolean isActive() {
+        return channel.isActive();
+    }
+    
+    /**
+     * Await channel close.
+     *
+     * @throws InterruptedException interrupted exception
+     */
+    public void await() throws InterruptedException {
+        channel.closeFuture().sync();
+    }
+    
+    /**
+     * Login.
+     *
+     * @param parameter parameter
+     * @throws IllegalStateException     the channel is not active
+     * @throws IllegalArgumentException  the user is illegal
+     * @throws GetResultTimeoutException get result timeout
+     */
+    public synchronized void login(final CDCLoginParameter parameter) {
+        if (null == channel || !channel.isActive()) {
+            throw new IllegalStateException("The channel is not active");
+        }
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        if (ClientConnectionStatus.LOGGED_IN == 
connectionContext.getStatus().get()) {
+            throw new IllegalStateException("The client is already logged in");
+        }
+        LoginRequestBody loginRequestBody = 
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(parameter.getUsername())
+                
.setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
+        String requestId = RequestIdUtils.generateRequestId();
+        CDCRequest data = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(requestId).setLoginRequestBody(loginRequestBody).build();
+        ResponseFuture responseFuture = new ResponseFuture();
+        connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
+        channel.writeAndFlush(data);
         try {
-            ChannelFuture future = bootstrap.connect(address, port).sync();
-            future.channel().closeFuture().sync();
+            getResultFromResponseFuture(responseFuture);
+            log.info("Login success, username: {}", parameter.getUsername());
         } finally {
+            connectionContext.getResponseFutureMap().remove(requestId);
+        }
+    }
+    
+    private Object getResultFromResponseFuture(final ResponseFuture 
responseFuture) {
+        boolean receivedResult = 
responseFuture.waitResponse(config.getTimeoutMills());
+        if (!receivedResult) {
+            throw new GetResultTimeoutException("Get result timeout");
+        }
+        if (!Strings.isNullOrEmpty(responseFuture.getErrorMessage())) {
+            throw new IllegalArgumentException(String.format("Get Response 
failed, reason: %s", responseFuture.getErrorMessage()));
+        }
+        return responseFuture.getResult();

Review Comment:
   1, Seems check error message should be before check received or not.
   
   2, Could we merge this block of code into `waitResponse`?
   
   3, Is there underlying timeout exception to replace 
`GetResultTimeoutException`?
   
   4, Could we use another exception to replace `IllegalArgumentException`? 
Looks it's not suitable.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to