azexcy commented on code in PR #27542:
URL: https://github.com/apache/shardingsphere/pull/27542#discussion_r1280173424
##########
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java:
##########
@@ -93,14 +82,174 @@ protected void initChannel(final NioSocketChannel channel)
{
channel.pipeline().addLast(new
ProtobufDecoder(CDCResponse.getDefaultInstance()));
channel.pipeline().addLast(new
ProtobufVarint32LengthFieldPrepender());
channel.pipeline().addLast(new ProtobufEncoder());
- channel.pipeline().addLast(new
LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
- channel.pipeline().addLast(new
CDCRequestHandler(parameter, consumer));
+
channel.pipeline().addLast(CDCLoginRequestHandler.class.getSimpleName(), new
CDCLoginRequestHandler());
+ channel.pipeline().addLast(new
CDCRequestHandler(config.getConsumer()));
}
});
+ channel = bootstrap.connect(config.getAddress(),
config.getPort()).sync().channel();
+ }
+
+ private void validateParameter(final CDCClientConfiguration parameter) {
+ if (null == parameter.getAddress() ||
parameter.getAddress().isEmpty()) {
+ throw new IllegalArgumentException("The address parameter can't be
null");
+ }
+ if (parameter.getPort() <= 0) {
+ throw new IllegalArgumentException("The port must be greater than
0");
+ }
+ }
+
+ /**
+ * Check channel is active.
+ *
+ * @return true if channel is active
+ */
+ public boolean isActive() {
+ return channel.isActive();
+ }
+
+ /**
+ * Await channel close.
+ *
+ * @throws InterruptedException interrupted exception
+ */
+ public void await() throws InterruptedException {
+ channel.closeFuture().sync();
+ }
+
+ /**
+ * Login.
+ *
+ * @param parameter parameter
+ * @throws IllegalStateException the channel is not active
+ * @throws IllegalArgumentException the user is illegal
+ * @throws GetResultTimeoutException get result timeout
+ */
+ public synchronized void login(final CDCLoginParameter parameter) {
+ if (null == channel || !channel.isActive()) {
+ throw new IllegalStateException("The channel is not active");
+ }
+ ClientConnectionContext connectionContext =
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+ if (ClientConnectionStatus.LOGGED_IN ==
connectionContext.getStatus().get()) {
+ throw new IllegalStateException("The client is already logged in");
+ }
+ LoginRequestBody loginRequestBody =
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(parameter.getUsername())
+
.setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
+ String requestId = RequestIdUtils.generateRequestId();
+ CDCRequest data =
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(requestId).setLoginRequestBody(loginRequestBody).build();
+ ResponseFuture responseFuture = new ResponseFuture();
+ connectionContext.getResponseFutureMap().put(requestId,
responseFuture);
+ channel.writeAndFlush(data);
try {
- ChannelFuture future = bootstrap.connect(address, port).sync();
- future.channel().closeFuture().sync();
+ getResultFromResponseFuture(responseFuture);
+ log.info("Login success, username: {}", parameter.getUsername());
} finally {
+ connectionContext.getResponseFutureMap().remove(requestId);
+ }
+ }
+
+ private Object getResultFromResponseFuture(final ResponseFuture
responseFuture) {
+ boolean receivedResult =
responseFuture.waitResponse(config.getTimeoutMills());
+ if (!receivedResult) {
+ throw new GetResultTimeoutException("Get result timeout");
+ }
+ if (!Strings.isNullOrEmpty(responseFuture.getErrorMessage())) {
+ throw new IllegalArgumentException(String.format("Get Response
failed, reason: %s", responseFuture.getErrorMessage()));
+ }
+ return responseFuture.getResult();
Review Comment:
3. I did not find a more suitable , `java.util.concurrent.TimeoutException`
is not a RuntimeException,
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]