This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new daa89aed302 Remove ChannelFuture.sync() in MySQLClient.closeChannel to 
avoid possible sync() blocking (#29706)
daa89aed302 is described below

commit daa89aed3024ccfccac5b10874a6a51aa1bfcd92
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 12 18:31:16 2024 +0800

    Remove ChannelFuture.sync() in MySQLClient.closeChannel to avoid possible 
sync() blocking (#29706)
    
    * Remove ChannelFuture.sync() in MySQLClient.closeChannel to avoid possible 
sync() blocking
    
    * Update unit test
---
 .../channel/memory/MemoryPipelineChannelTest.java  |  2 +-
 .../pipeline/mysql/ingest/client/MySQLClient.java  | 30 ++++++++++++----------
 .../mysql/ingest/client/MySQLClientTest.java       |  8 ------
 3 files changed, 17 insertions(+), 23 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
index 812250a5727..651dd9166cb 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
@@ -48,7 +48,7 @@ class MemoryPipelineChannelTest {
         });
         thread.start();
         assertTrue(semaphore.tryAcquire(1L, TimeUnit.SECONDS));
-        assertThat(channel.fetch(1, 50L), is(records));
+        assertThat(channel.fetch(1, 500L), is(records));
     }
     
     @Test
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 2f234193fb9..01519a6b23c 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.client;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
@@ -277,21 +278,19 @@ public final class MySQLClient {
     
     /**
      * Close netty channel.
+     *
+     * @return channel future
      */
-    public void closeChannel() {
+    public Optional<ChannelFuture> closeChannel() {
         if (null == channel || !channel.isOpen()) {
-            return;
+            return Optional.empty();
         }
-        try {
-            running = false;
-            channel.close().sync();
-            if (null != eventLoopGroup) {
-                eventLoopGroup.shutdownGracefully();
-            }
-        } catch (final InterruptedException ex) {
-            Thread.currentThread().interrupt();
-            log.error("close channel interrupted", ex);
+        running = false;
+        ChannelFuture future = channel.close();
+        if (null != eventLoopGroup) {
+            eventLoopGroup.shutdownGracefully();
         }
+        return Optional.of(future);
     }
     
     private final class MySQLCommandResponseHandler extends 
ChannelInboundHandlerAdapter {
@@ -344,7 +343,7 @@ public final class MySQLClient {
         }
         
         @Override
-        public void channelInactive(final ChannelHandlerContext ctx) {
+        public void channelInactive(final ChannelHandlerContext ctx) throws 
Exception {
             log.warn("MySQL binlog channel inactive");
             if (!running) {
                 return;
@@ -359,8 +358,11 @@ public final class MySQLClient {
             log.error("MySQLBinlogEventHandler protocol resolution error, file 
name:{}, position:{}", fileName, position, cause);
         }
         
-        private void reconnect() {
-            closeChannel();
+        private void reconnect() throws ExecutionException, 
InterruptedException, TimeoutException {
+            Optional<ChannelFuture> futureOptional = closeChannel();
+            if (futureOptional.isPresent()) {
+                futureOptional.get().get(1, TimeUnit.SECONDS);
+            }
             if (reconnectTimes.incrementAndGet() > 3) {
                 log.warn("Exceeds the maximum number of retry times, last 
binlog event:{}", lastBinlogEvent);
                 return;
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
index 7d9ffb84459..6367de3b1d1 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
@@ -44,7 +44,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -153,13 +152,6 @@ class MySQLClientTest {
         }
     }
     
-    @Test
-    void assertCloseChannel() throws ReflectiveOperationException {
-        
Plugins.getMemberAccessor().set(MySQLClient.class.getDeclaredField("channel"), 
mysqlClient, channel);
-        mysqlClient.closeChannel();
-        assertFalse(channel.isOpen());
-    }
-    
     @Test
     void assertPollFailed() throws ReflectiveOperationException {
         
Plugins.getMemberAccessor().set(MySQLClient.class.getDeclaredField("channel"), 
mysqlClient, channel);

Reply via email to