azexcy commented on code in PR #27542:
URL: https://github.com/apache/shardingsphere/pull/27542#discussion_r1280173424


##########
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java:
##########
@@ -93,14 +82,174 @@ protected void initChannel(final NioSocketChannel channel) 
{
                         channel.pipeline().addLast(new 
ProtobufDecoder(CDCResponse.getDefaultInstance()));
                         channel.pipeline().addLast(new 
ProtobufVarint32LengthFieldPrepender());
                         channel.pipeline().addLast(new ProtobufEncoder());
-                        channel.pipeline().addLast(new 
LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
-                        channel.pipeline().addLast(new 
CDCRequestHandler(parameter, consumer));
+                        
channel.pipeline().addLast(CDCLoginRequestHandler.class.getSimpleName(), new 
CDCLoginRequestHandler());
+                        channel.pipeline().addLast(new 
CDCRequestHandler(config.getConsumer()));
                     }
                 });
+        channel = bootstrap.connect(config.getAddress(), 
config.getPort()).sync().channel();
+    }
+    
+    private void validateParameter(final CDCClientConfiguration parameter) {
+        if (null == parameter.getAddress() || 
parameter.getAddress().isEmpty()) {
+            throw new IllegalArgumentException("The address parameter can't be 
null");
+        }
+        if (parameter.getPort() <= 0) {
+            throw new IllegalArgumentException("The port must be greater than 
0");
+        }
+    }
+    
+    /**
+     * Check channel is active.
+     *
+     * @return true if channel is active
+     */
+    public boolean isActive() {
+        return channel.isActive();
+    }
+    
+    /**
+     * Await channel close.
+     *
+     * @throws InterruptedException interrupted exception
+     */
+    public void await() throws InterruptedException {
+        channel.closeFuture().sync();
+    }
+    
+    /**
+     * Login.
+     *
+     * @param parameter parameter
+     * @throws IllegalStateException     the channel is not active
+     * @throws IllegalArgumentException  the user is illegal
+     * @throws GetResultTimeoutException get result timeout
+     */
+    public synchronized void login(final CDCLoginParameter parameter) {
+        if (null == channel || !channel.isActive()) {
+            throw new IllegalStateException("The channel is not active");
+        }
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        if (ClientConnectionStatus.LOGGED_IN == 
connectionContext.getStatus().get()) {
+            throw new IllegalStateException("The client is already logged in");
+        }
+        LoginRequestBody loginRequestBody = 
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(parameter.getUsername())
+                
.setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
+        String requestId = RequestIdUtils.generateRequestId();
+        CDCRequest data = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(requestId).setLoginRequestBody(loginRequestBody).build();
+        ResponseFuture responseFuture = new ResponseFuture();
+        connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
+        channel.writeAndFlush(data);
         try {
-            ChannelFuture future = bootstrap.connect(address, port).sync();
-            future.channel().closeFuture().sync();
+            getResultFromResponseFuture(responseFuture);
+            log.info("Login success, username: {}", parameter.getUsername());
         } finally {
+            connectionContext.getResponseFutureMap().remove(requestId);
+        }
+    }
+    
+    private Object getResultFromResponseFuture(final ResponseFuture 
responseFuture) {
+        boolean receivedResult = 
responseFuture.waitResponse(config.getTimeoutMills());
+        if (!receivedResult) {
+            throw new GetResultTimeoutException("Get result timeout");
+        }
+        if (!Strings.isNullOrEmpty(responseFuture.getErrorMessage())) {
+            throw new IllegalArgumentException(String.format("Get Response 
failed, reason: %s", responseFuture.getErrorMessage()));
+        }
+        return responseFuture.getResult();

Review Comment:
   3. I did not find a more suitable , `java.util.concurrent.TimeoutException` 
is not a RuntimeException, 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to