This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f1b29f068a5 Refactor MySQLClient (#32534)
f1b29f068a5 is described below
commit f1b29f068a56a277083aefb4fa223fef3eef1e7d
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Aug 15 18:13:59 2024 +0800
Refactor MySQLClient (#32534)
---
.../pipeline/mysql/ingest/client/MySQLClient.java | 48 +++++++++++++---------
1 file changed, 29 insertions(+), 19 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 2517cb6060d..401dc1d7c03 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -31,6 +31,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
@@ -50,20 +51,23 @@ import
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket
import
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
import org.apache.shardingsphere.db.protocol.netty.ChannelAttrInitializer;
import
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
+import org.apache.shardingsphere.infra.util.json.JsonUtils;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
- * MySQL Connector.
+ * MySQL client.
*/
@RequiredArgsConstructor
@Slf4j
@@ -73,8 +77,6 @@ public final class MySQLClient {
private final boolean decodeWithTX;
- private final AtomicInteger reconnectTimes = new AtomicInteger();
-
private final ArrayBlockingQueue<List<AbstractBinlogEvent>>
blockingEventQueue = new ArrayBlockingQueue<>(2500);
private EventLoopGroup eventLoopGroup;
@@ -314,6 +316,8 @@ public final class MySQLClient {
private final AtomicReference<AbstractBinlogEvent> lastBinlogEvent;
+ private final AtomicBoolean reconnectRequested = new
AtomicBoolean(false);
+
MySQLBinlogEventHandler(final AbstractBinlogEvent lastBinlogEvent) {
this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
}
@@ -324,7 +328,6 @@ public final class MySQLClient {
if (!running) {
return;
}
- reconnectTimes.set(0);
if (msg instanceof List) {
List<AbstractBinlogEvent> records =
(List<AbstractBinlogEvent>) msg;
if (records.isEmpty()) {
@@ -342,33 +345,40 @@ public final class MySQLClient {
}
@Override
- public void channelInactive(final ChannelHandlerContext ctx) throws
Exception {
+ public void channelInactive(final ChannelHandlerContext ctx) {
log.warn("MySQL binlog channel inactive, channel: {}, running:
{}", ctx.channel(), running);
if (!running) {
return;
}
- reconnect();
+ tryReconnect();
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final
Throwable cause) {
- String fileName = null == lastBinlogEvent.get() ? null :
lastBinlogEvent.get().getFileName();
- Long position = null == lastBinlogEvent.get() ? null :
lastBinlogEvent.get().getPosition();
- log.error("MySQLBinlogEventHandler protocol resolution error, file
name:{}, position:{}", fileName, position, cause);
+ log.error("MySQLBinlogEventHandler protocol resolution error,
channel: {}, lastBinlogEvent: {}", ctx.channel(),
JsonUtils.toJsonString(lastBinlogEvent.get()), cause);
}
- private void reconnect() throws ExecutionException,
InterruptedException, TimeoutException {
- Optional<ChannelFuture> future = closeChannel();
- if (future.isPresent()) {
- future.get().get(1L, TimeUnit.SECONDS);
+ private void tryReconnect() {
+ if (reconnectRequested.compareAndSet(false, true)) {
+
CompletableFuture.runAsync(this::reconnect).whenComplete((result, ex) ->
reconnectRequested.set(false));
}
- if (reconnectTimes.incrementAndGet() > 3) {
- log.warn("Exceeds the maximum number of retry times, last
binlog event:{}", lastBinlogEvent);
- return;
+ }
+
+ @SneakyThrows(InterruptedException.class)
+ private synchronized void reconnect() {
+ for (int reconnectTimes = 0; reconnectTimes < 3; reconnectTimes++)
{
+ try {
+ connect();
+ log.info("Reconnect times {}", reconnectTimes);
+ subscribe(lastBinlogEvent.get().getFileName(),
lastBinlogEvent.get().getPosition());
+ break;
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ log.error("Reconnect failed, reconnect times: {},
lastBinlogEvent: {}", reconnectTimes,
JsonUtils.toJsonString(lastBinlogEvent.get()), ex);
+ Thread.sleep(1000L << reconnectTimes);
+ }
}
- connect();
- log.info("Reconnect times {}", reconnectTimes.get());
- subscribe(lastBinlogEvent.get().getFileName(),
lastBinlogEvent.get().getPosition());
}
}
}