This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f1b29f068a5 Refactor MySQLClient (#32534)
f1b29f068a5 is described below

commit f1b29f068a56a277083aefb4fa223fef3eef1e7d
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Aug 15 18:13:59 2024 +0800

    Refactor MySQLClient (#32534)
---
 .../pipeline/mysql/ingest/client/MySQLClient.java  | 48 +++++++++++++---------
 1 file changed, 29 insertions(+), 19 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 2517cb6060d..401dc1d7c03 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -31,6 +31,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Promise;
 import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
@@ -50,20 +51,23 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
 import org.apache.shardingsphere.db.protocol.netty.ChannelAttrInitializer;
 import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
+import org.apache.shardingsphere.infra.util.json.JsonUtils;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * MySQL Connector.
+ * MySQL client.
  */
 @RequiredArgsConstructor
 @Slf4j
@@ -73,8 +77,6 @@ public final class MySQLClient {
     
     private final boolean decodeWithTX;
     
-    private final AtomicInteger reconnectTimes = new AtomicInteger();
-    
     private final ArrayBlockingQueue<List<AbstractBinlogEvent>> 
blockingEventQueue = new ArrayBlockingQueue<>(2500);
     
     private EventLoopGroup eventLoopGroup;
@@ -314,6 +316,8 @@ public final class MySQLClient {
         
         private final AtomicReference<AbstractBinlogEvent> lastBinlogEvent;
         
+        private final AtomicBoolean reconnectRequested = new 
AtomicBoolean(false);
+        
         MySQLBinlogEventHandler(final AbstractBinlogEvent lastBinlogEvent) {
             this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
         }
@@ -324,7 +328,6 @@ public final class MySQLClient {
             if (!running) {
                 return;
             }
-            reconnectTimes.set(0);
             if (msg instanceof List) {
                 List<AbstractBinlogEvent> records = 
(List<AbstractBinlogEvent>) msg;
                 if (records.isEmpty()) {
@@ -342,33 +345,40 @@ public final class MySQLClient {
         }
         
         @Override
-        public void channelInactive(final ChannelHandlerContext ctx) throws 
Exception {
+        public void channelInactive(final ChannelHandlerContext ctx) {
             log.warn("MySQL binlog channel inactive, channel: {}, running: 
{}", ctx.channel(), running);
             if (!running) {
                 return;
             }
-            reconnect();
+            tryReconnect();
         }
         
         @Override
         public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) {
-            String fileName = null == lastBinlogEvent.get() ? null : 
lastBinlogEvent.get().getFileName();
-            Long position = null == lastBinlogEvent.get() ? null : 
lastBinlogEvent.get().getPosition();
-            log.error("MySQLBinlogEventHandler protocol resolution error, file 
name:{}, position:{}", fileName, position, cause);
+            log.error("MySQLBinlogEventHandler protocol resolution error, 
channel: {}, lastBinlogEvent: {}", ctx.channel(), 
JsonUtils.toJsonString(lastBinlogEvent.get()), cause);
         }
         
-        private void reconnect() throws ExecutionException, 
InterruptedException, TimeoutException {
-            Optional<ChannelFuture> future = closeChannel();
-            if (future.isPresent()) {
-                future.get().get(1L, TimeUnit.SECONDS);
+        private void tryReconnect() {
+            if (reconnectRequested.compareAndSet(false, true)) {
+                
CompletableFuture.runAsync(this::reconnect).whenComplete((result, ex) -> 
reconnectRequested.set(false));
             }
-            if (reconnectTimes.incrementAndGet() > 3) {
-                log.warn("Exceeds the maximum number of retry times, last 
binlog event:{}", lastBinlogEvent);
-                return;
+        }
+        
+        @SneakyThrows(InterruptedException.class)
+        private synchronized void reconnect() {
+            for (int reconnectTimes = 0; reconnectTimes < 3; reconnectTimes++) 
{
+                try {
+                    connect();
+                    log.info("Reconnect times {}", reconnectTimes);
+                    subscribe(lastBinlogEvent.get().getFileName(), 
lastBinlogEvent.get().getPosition());
+                    break;
+                    // CHECKSTYLE:OFF
+                } catch (final RuntimeException ex) {
+                    // CHECKSTYLE:ON
+                    log.error("Reconnect failed, reconnect times: {}, 
lastBinlogEvent: {}", reconnectTimes, 
JsonUtils.toJsonString(lastBinlogEvent.get()), ex);
+                    Thread.sleep(1000L << reconnectTimes);
+                }
             }
-            connect();
-            log.info("Reconnect times {}", reconnectTimes.get());
-            subscribe(lastBinlogEvent.get().getFileName(), 
lastBinlogEvent.get().getPosition());
         }
     }
 }

Reply via email to