This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new aefeae30e1d Improve MySQLClient connect and supports recovery in the
middle of a batch event (#19852)
aefeae30e1d is described below
commit aefeae30e1de331443087739b5e4151048df9fdb
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Aug 4 18:26:04 2022 +0800
Improve MySQLClient connect and supports recovery in the middle of a batch
event (#19852)
---
.../mysql/ingest/GlobalTableMapEventMapping.java | 42 ++++++++++++++++++++++
.../mysql/ingest/binlog/BinlogContext.java | 9 ++---
.../pipeline/mysql/ingest/client/MySQLClient.java | 32 +++++++++--------
.../netty/MySQLBinlogEventPacketDecoder.java | 6 ++--
.../mysql/ingest/binlog/BinlogContextTest.java | 3 +-
.../mysql/ingest/client/MySQLClientTest.java | 9 +++--
.../netty/MySQLBinlogEventPacketDecoderTest.java | 3 +-
7 files changed, 78 insertions(+), 26 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
new file mode 100644
index 00000000000..92dd053e5d2
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.ingest;
+
+import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Global table map event mapping.
+ * // TODO still save at memory, if restart the Proxy, the data will be lost.
+ */
+public class GlobalTableMapEventMapping {
+
+ private static final Map<String, Map<Long,
MySQLBinlogTableMapEventPacket>> TABLE_MAP_EVENT_MAPPING = new
ConcurrentHashMap<>();
+
+ /**
+ * Get table map event map by database url.
+ *
+ * @param databaseUrl database url
+ * @return table map event map
+ */
+ public static Map<Long, MySQLBinlogTableMapEventPacket>
getTableMapEventMap(final String databaseUrl) {
+ return TABLE_MAP_EVENT_MAPPING.computeIfAbsent(databaseUrl, k -> new
ConcurrentHashMap<>());
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContext.java
index 76ec79b56e5..9c64b573293 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContext.java
@@ -18,11 +18,11 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.Setter;
import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -30,14 +30,15 @@ import java.util.Map;
* Binlog context.
*/
@Getter
-@Setter
+@RequiredArgsConstructor
public final class BinlogContext {
+ @Setter
private String fileName;
- private int checksumLength;
+ private final int checksumLength;
- private Map<Long, MySQLBinlogTableMapEventPacket> tableMap = new
HashMap<>();
+ private final Map<Long, MySQLBinlogTableMapEventPacket> tableMap;
/**
* Cache table map event.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 145c9cf33bf..5a7d4474c40 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -32,6 +32,7 @@ import io.netty.util.concurrent.Promise;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.GlobalTableMapEventMapping;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLBinlogEventPacketDecoder;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLCommandPacketDecoder;
@@ -51,6 +52,7 @@ import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -191,7 +193,8 @@ public final class MySQLClient {
responseCallback = null;
channel.pipeline().remove(MySQLCommandPacketDecoder.class);
channel.pipeline().remove(MySQLCommandResponseHandler.class);
- channel.pipeline().addLast(new
MySQLBinlogEventPacketDecoder(checksumLength));
+ String tableKey = String.join(":", connectInfo.getHost(),
String.valueOf(connectInfo.getPort()));
+ channel.pipeline().addLast(new
MySQLBinlogEventPacketDecoder(checksumLength,
GlobalTableMapEventMapping.getTableMapEventMap(tableKey)));
channel.pipeline().addLast(new MySQLBinlogEventHandler());
channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int)
binlogPosition, connectInfo.getServerId(), binlogFileName));
}
@@ -215,7 +218,7 @@ public final class MySQLClient {
@SuppressWarnings("unchecked")
private <T> T waitExpectedResponse(final Class<T> type) {
try {
- Object response = responseCallback.get();
+ Object response = responseCallback.get(5, TimeUnit.SECONDS);
if (null == response) {
return null;
}
@@ -226,7 +229,7 @@ public final class MySQLClient {
throw new RuntimeException(((MySQLErrPacket)
response).getErrorMessage());
}
throw new RuntimeException("unexpected response type");
- } catch (final InterruptedException | ExecutionException ex) {
+ } catch (final InterruptedException | ExecutionException |
TimeoutException ex) {
throw new RuntimeException(ex);
}
}
@@ -239,11 +242,9 @@ public final class MySQLClient {
return;
}
try {
- channel.close();
- // CHECKSTYLE:OFF
- } catch (final RuntimeException ex) {
- // CHECKSTYLE:ON
- log.error("close channel error", ex);
+ channel.close().sync();
+ } catch (final InterruptedException ex) {
+ log.error("close channel interrupted", ex);
}
}
@@ -286,23 +287,24 @@ public final class MySQLClient {
if (!running) {
return;
}
- if (reconnectTimes.get() > 3) {
- log.warn("exceeds the maximum number of retry times, last
binlog event:{}", lastBinlogEvent);
- running = false;
- return;
- }
reconnect();
}
-
+
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final
Throwable cause) {
running = false;
String fileName = null == lastBinlogEvent ? null :
lastBinlogEvent.getFileName();
Long position = null == lastBinlogEvent ? null :
lastBinlogEvent.getPosition();
log.error("MySQLBinlogEventHandler protocol resolution error, file
name:{}, position:{}", fileName, position, cause);
+ reconnect();
}
-
+
private void reconnect() {
+ if (reconnectTimes.get() > 3) {
+ log.warn("exceeds the maximum number of retry times, last
binlog event:{}", lastBinlogEvent);
+ running = false;
+ return;
+ }
int retryTimes = reconnectTimes.incrementAndGet();
log.info("reconnect MySQL client, retry times={}", retryTimes);
closeChannel();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 6d5549624d1..37a6b79506c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -39,6 +39,7 @@ import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlog
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
/**
@@ -49,9 +50,8 @@ public final class MySQLBinlogEventPacketDecoder extends
ByteToMessageDecoder {
private final BinlogContext binlogContext;
- public MySQLBinlogEventPacketDecoder(final int checksumLength) {
- binlogContext = new BinlogContext();
- binlogContext.setChecksumLength(checksumLength);
+ public MySQLBinlogEventPacketDecoder(final int checksumLength, final
Map<Long, MySQLBinlogTableMapEventPacket> tableMap) {
+ binlogContext = new BinlogContext(checksumLength, tableMap);
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContextTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContextTest.java
index 6faa252f948..4dd9acebc64 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContextTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContextTest.java
@@ -27,6 +27,7 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
@@ -49,7 +50,7 @@ public final class BinlogContextTest {
@Before
public void setUp() {
- binlogContext = new BinlogContext();
+ binlogContext = new BinlogContext(4, new HashMap<>());
when(tableMapEventPacket.getSchemaName()).thenReturn(TEST_SCHEMA);
when(tableMapEventPacket.getTableName()).thenReturn(TEST_TABLE);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
index 1bed2530ff9..f13cfdda984 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.client;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Promise;
@@ -52,14 +53,18 @@ public final class MySQLClientTest {
@Mock
private ChannelPipeline pipeline;
+ @Mock
+ private ChannelFuture channelFuture;
+
private MySQLClient mysqlClient;
@Before
- public void setUp() {
+ public void setUp() throws InterruptedException {
mysqlClient = new MySQLClient(new ConnectInfo(1, "host", 3306,
"username", "password"));
when(channel.pipeline()).thenReturn(pipeline);
when(channel.isOpen()).thenReturn(true);
- when(channel.close()).thenAnswer(invocation -> {
+ when(channel.close()).thenReturn(channelFuture);
+ when(channelFuture.sync()).thenAnswer(invocation -> {
when(channel.isOpen()).thenReturn(false);
return null;
});
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
index c9424aaf3dd..6e5160c7f0d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -43,6 +43,7 @@ import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
@@ -66,7 +67,7 @@ public final class MySQLBinlogEventPacketDecoderTest {
@Before
public void setUp() throws NoSuchFieldException, IllegalAccessException {
- binlogEventPacketDecoder = new MySQLBinlogEventPacketDecoder(4);
+ binlogEventPacketDecoder = new MySQLBinlogEventPacketDecoder(4, new
ConcurrentHashMap<>());
binlogContext = ReflectionUtil.getFieldValue(binlogEventPacketDecoder,
"binlogContext", BinlogContext.class);
when(channelHandlerContext.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get()).thenReturn(StandardCharsets.UTF_8);
columnDefs = Lists.newArrayList(new
MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_LONGLONG), new
MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_LONG),