This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b0bd0a2f268 Clean up CDC code and refactor StartCDCClientParameter 
(#25254)
b0bd0a2f268 is described below

commit b0bd0a2f268e77445e841a57512414e02b033485
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Apr 21 11:33:55 2023 +0800

    Clean up CDC code and refactor StartCDCClientParameter (#25254)
    
    * Remove unused ImportDataSourceParameter
    
    * Remove unused try-catch in CDCRequestHandler.processDataRecords
    
    * Extract Consumer from StartCDCClientParameter
---
 .../data/pipeline/cdc/client/CDCClient.java        | 11 +++++--
 .../cdc/client/handler/CDCRequestHandler.java      | 15 ++--------
 .../parameter/ImportDataSourceParameter.java       | 35 ----------------------
 .../client/parameter/StartCDCClientParameter.java  |  4 ---
 .../pipeline/cdc/client/example/Bootstrap.java     |  4 +--
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  6 ++--
 6 files changed, 17 insertions(+), 58 deletions(-)

diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index d8532932415..459e6da6e4e 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -33,6 +33,10 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.CDCRequestHand
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoginRequestHandler;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+
+import java.util.List;
+import java.util.function.Consumer;
 
 /**
  * CDC client.
@@ -42,9 +46,12 @@ public final class CDCClient {
     
     private final StartCDCClientParameter parameter;
     
-    public CDCClient(final StartCDCClientParameter parameter) {
+    private final Consumer<List<Record>> consumer;
+    
+    public CDCClient(final StartCDCClientParameter parameter, final 
Consumer<List<Record>> consumer) {
         validateParameter(parameter);
         this.parameter = parameter;
+        this.consumer = consumer;
     }
     
     private void validateParameter(final StartCDCClientParameter parameter) {
@@ -85,7 +92,7 @@ public final class CDCClient {
                         channel.pipeline().addLast(new 
ProtobufVarint32LengthFieldPrepender());
                         channel.pipeline().addLast(new ProtobufEncoder());
                         channel.pipeline().addLast(new 
LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
-                        channel.pipeline().addLast(new 
CDCRequestHandler(parameter));
+                        channel.pipeline().addLast(new 
CDCRequestHandler(parameter, consumer));
                     }
                 });
         try {
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
index 425e0b1b9a3..1a2cce307bc 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.cdc.client.handler;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
@@ -43,6 +44,7 @@ import java.util.function.Consumer;
 /**
  * CDC request handler.
  */
+@RequiredArgsConstructor
 @Slf4j
 public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {
     
@@ -50,11 +52,6 @@ public final class CDCRequestHandler extends 
ChannelInboundHandlerAdapter {
     
     private final Consumer<List<Record>> consumer;
     
-    public CDCRequestHandler(final StartCDCClientParameter parameter) {
-        this.parameter = parameter;
-        consumer = parameter.getConsumer();
-    }
-    
     @Override
     public void userEventTriggered(final ChannelHandlerContext ctx, final 
Object evt) {
         if (evt instanceof StreamDataEvent) {
@@ -91,13 +88,7 @@ public final class CDCRequestHandler extends 
ChannelInboundHandlerAdapter {
     
     private void processDataRecords(final ChannelHandlerContext ctx, final 
DataRecordResult result) {
         List<Record> recordsList = result.getRecordList();
-        try {
-            consumer.accept(recordsList);
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            throw new RuntimeException(ex);
-        }
+        consumer.accept(recordsList);
         
ctx.channel().writeAndFlush(CDCRequest.newBuilder().setType(Type.ACK_STREAMING).setAckStreamingRequestBody(AckStreamingRequestBody.newBuilder().setAckId(result.getAckId()).build()).build());
     }
     
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/ImportDataSourceParameter.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/ImportDataSourceParameter.java
deleted file mode 100644
index 52a9664fe6d..00000000000
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/ImportDataSourceParameter.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-/**
- * Import data source parameter.
- */
-@RequiredArgsConstructor
-@Getter
-public final class ImportDataSourceParameter {
-    
-    private final String jdbcUrl;
-    
-    private final String username;
-    
-    private final String password;
-}
diff --git 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
index d92a55fc6e2..f7148d4880b 100644
--- 
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ 
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
@@ -21,10 +21,8 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
 
 import java.util.List;
-import java.util.function.Consumer;
 
 /**
  * Start CDC client parameter.
@@ -47,6 +45,4 @@ public final class StartCDCClientParameter {
     private List<SchemaTable> schemaTables;
     
     private boolean full;
-    
-    private final Consumer<List<Record>> consumer;
 }
diff --git 
a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
 
b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
index fc968c3ddf9..3e371a5aa4b 100644
--- 
a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
+++ 
b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
@@ -36,7 +36,7 @@ public final class Bootstrap {
         // Pay attention to the time zone, to avoid the problem of incorrect 
time zone, it is best to ensure that the time zone of the program is consistent 
with the time zone of the database server
         // and mysql-connector-java 5.x version will ignore serverTimezone 
jdbc parameter and use the default time zone in the program
         // TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
-        StartCDCClientParameter parameter = new 
StartCDCClientParameter(records -> log.info("records: {}", records));
+        StartCDCClientParameter parameter = new StartCDCClientParameter();
         parameter.setAddress("127.0.0.1");
         parameter.setPort(33071);
         parameter.setUsername("root");
@@ -44,7 +44,7 @@ public final class Bootstrap {
         parameter.setDatabase("sharding_db");
         parameter.setFull(true);
         
parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build()));
-        CDCClient cdcClient = new CDCClient(parameter);
+        CDCClient cdcClient = new CDCClient(parameter, records -> 
log.info("records: {}", records));
         cdcClient.start();
     }
 }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 2a514cc8d99..58361dd07c7 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -170,8 +170,7 @@ class CDCE2EIT {
     private void startCDCClient(final PipelineContainerComposer 
containerComposer) {
         DataSource dataSource = 
StorageContainerUtils.generateDataSource(containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
 false)),
                 containerComposer.getUsername(), 
containerComposer.getPassword());
-        DataSourceRecordConsumer dataSourceRecordConsumer = new 
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
-        StartCDCClientParameter parameter = new 
StartCDCClientParameter(dataSourceRecordConsumer);
+        StartCDCClientParameter parameter = new StartCDCClientParameter();
         parameter.setAddress("localhost");
         
parameter.setPort(containerComposer.getContainerComposer().getProxyCDCPort());
         parameter.setUsername(ProxyContainerConstants.USERNAME);
@@ -181,7 +180,8 @@ class CDCE2EIT {
         parameter.setFull(true);
         String schema = 
containerComposer.getDatabaseType().isSchemaAvailable() ? "test" : "";
         
parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build()));
-        CompletableFuture.runAsync(() -> new CDCClient(parameter).start(), 
executor).whenComplete((unused, throwable) -> {
+        DataSourceRecordConsumer recordConsumer = new 
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
+        CompletableFuture.runAsync(() -> new CDCClient(parameter, 
recordConsumer).start(), executor).whenComplete((unused, throwable) -> {
             if (null != throwable) {
                 log.error("cdc client sync failed, ", throwable);
             }

Reply via email to