This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new daa89aed302 Remove ChannelFuture.sync() in MySQLClient.closeChannel to
avoid possible sync() blocking (#29706)
daa89aed302 is described below
commit daa89aed3024ccfccac5b10874a6a51aa1bfcd92
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 12 18:31:16 2024 +0800
Remove ChannelFuture.sync() in MySQLClient.closeChannel to avoid possible
sync() blocking (#29706)
* Remove ChannelFuture.sync() in MySQLClient.closeChannel to avoid possible
sync() blocking
* Update unit test
---
.../channel/memory/MemoryPipelineChannelTest.java | 2 +-
.../pipeline/mysql/ingest/client/MySQLClient.java | 30 ++++++++++++----------
.../mysql/ingest/client/MySQLClientTest.java | 8 ------
3 files changed, 17 insertions(+), 23 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
index 812250a5727..651dd9166cb 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
@@ -48,7 +48,7 @@ class MemoryPipelineChannelTest {
});
thread.start();
assertTrue(semaphore.tryAcquire(1L, TimeUnit.SECONDS));
- assertThat(channel.fetch(1, 50L), is(records));
+ assertThat(channel.fetch(1, 500L), is(records));
}
@Test
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 2f234193fb9..01519a6b23c 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.mysql.ingest.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
@@ -277,21 +278,19 @@ public final class MySQLClient {
/**
* Close netty channel.
+ *
+ * @return channel future
*/
- public void closeChannel() {
+ public Optional<ChannelFuture> closeChannel() {
if (null == channel || !channel.isOpen()) {
- return;
+ return Optional.empty();
}
- try {
- running = false;
- channel.close().sync();
- if (null != eventLoopGroup) {
- eventLoopGroup.shutdownGracefully();
- }
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
- log.error("close channel interrupted", ex);
+ running = false;
+ ChannelFuture future = channel.close();
+ if (null != eventLoopGroup) {
+ eventLoopGroup.shutdownGracefully();
}
+ return Optional.of(future);
}
private final class MySQLCommandResponseHandler extends
ChannelInboundHandlerAdapter {
@@ -344,7 +343,7 @@ public final class MySQLClient {
}
@Override
- public void channelInactive(final ChannelHandlerContext ctx) {
+ public void channelInactive(final ChannelHandlerContext ctx) throws
Exception {
log.warn("MySQL binlog channel inactive");
if (!running) {
return;
@@ -359,8 +358,11 @@ public final class MySQLClient {
log.error("MySQLBinlogEventHandler protocol resolution error, file
name:{}, position:{}", fileName, position, cause);
}
- private void reconnect() {
- closeChannel();
+ private void reconnect() throws ExecutionException,
InterruptedException, TimeoutException {
+ Optional<ChannelFuture> futureOptional = closeChannel();
+ if (futureOptional.isPresent()) {
+ futureOptional.get().get(1, TimeUnit.SECONDS);
+ }
if (reconnectTimes.incrementAndGet() > 3) {
log.warn("Exceeds the maximum number of retry times, last
binlog event:{}", lastBinlogEvent);
return;
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
index 7d9ffb84459..6367de3b1d1 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
@@ -44,7 +44,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -153,13 +152,6 @@ class MySQLClientTest {
}
}
- @Test
- void assertCloseChannel() throws ReflectiveOperationException {
-
Plugins.getMemberAccessor().set(MySQLClient.class.getDeclaredField("channel"),
mysqlClient, channel);
- mysqlClient.closeChannel();
- assertFalse(channel.isOpen());
- }
-
@Test
void assertPollFailed() throws ReflectiveOperationException {
Plugins.getMemberAccessor().set(MySQLClient.class.getDeclaredField("channel"),
mysqlClient, channel);