This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dd163a80e1 Fix sonar issue of MySQLBinlogEventHandler (#25644)
8dd163a80e1 is described below

commit 8dd163a80e153d252f426d6abda5ec6bae580eb8
Author: Liang Zhang <[email protected]>
AuthorDate: Sat May 13 21:12:10 2023 +0800

    Fix sonar issue of MySQLBinlogEventHandler (#25644)
    
    * Fix sonar issue of InventoryTask
    
    * Fix sonar issue of MySQLBinlogEventHandler
    
    * Fix sonar issue of ServerInfo
    
    * Fix sonar issue of ServerInfo
    
    * Fix sonar issue of MySQLCommandPacketDecoder
    
    * Fix sonar issue of MySQLCommandPacketDecoder
    
    * Fix sonar issue of MySQLNegotiateHandler
---
 .../data/pipeline/core/task/InventoryTask.java     | 13 ++++----
 .../pipeline/mysql/ingest/client/MySQLClient.java  | 19 +++++++-----
 .../pipeline/mysql/ingest/client/ServerInfo.java   |  6 ++--
 .../mysql/ingest/client/ServerVersion.java         |  2 +-
 .../client/netty/MySQLCommandPacketDecoder.java    | 36 ++++++++++++----------
 .../ingest/client/netty/MySQLNegotiateHandler.java |  5 ++-
 .../mysql/ingest/client/MySQLClientTest.java       |  5 ++-
 .../client/netty/MySQLNegotiateHandlerTest.java    |  3 +-
 8 files changed, 47 insertions(+), 42 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 2914a87f655..35007836e44 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -44,6 +44,7 @@ import javax.sql.DataSource;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Inventory task.
@@ -65,7 +66,7 @@ public final class InventoryTask implements PipelineTask, 
AutoCloseable {
     
     private final Importer importer;
     
-    private volatile IngestPosition<?> position;
+    private final AtomicReference<IngestPosition<?>> position;
     
     public InventoryTask(final InventoryDumperConfiguration 
inventoryDumperConfig, final ImporterConfiguration importerConfig,
                          final PipelineChannelCreator pipelineChannelCreator, 
final ImporterConnector importerConnector,
@@ -77,9 +78,9 @@ public final class InventoryTask implements PipelineTask, 
AutoCloseable {
         this.inventoryImporterExecuteEngine = inventoryImporterExecuteEngine;
         channel = createChannel(pipelineChannelCreator);
         dumper = new InventoryDumper(inventoryDumperConfig, channel, 
sourceDataSource, sourceMetaDataLoader);
-        importer = TypedSPILoader.getService(ImporterCreator.class, 
importerConnector.getType()).createImporter(importerConfig, importerConnector, 
channel, jobProgressListener,
-                ImporterType.INVENTORY);
-        position = inventoryDumperConfig.getPosition();
+        importer = TypedSPILoader.getService(ImporterCreator.class,
+                importerConnector.getType()).createImporter(importerConfig, 
importerConnector, channel, jobProgressListener, ImporterType.INVENTORY);
+        position = new AtomicReference<>(inventoryDumperConfig.getPosition());
     }
     
     private String generateTaskId(final InventoryDumperConfiguration 
inventoryDumperConfig) {
@@ -123,7 +124,7 @@ public final class InventoryTask implements PipelineTask, 
AutoCloseable {
         return pipelineChannelCreator.createPipelineChannel(1, records -> {
             Record lastNormalRecord = RecordUtils.getLastNormalRecord(records);
             if (null != lastNormalRecord) {
-                position = lastNormalRecord.getPosition();
+                position.set(lastNormalRecord.getPosition());
             }
         });
     }
@@ -136,7 +137,7 @@ public final class InventoryTask implements PipelineTask, 
AutoCloseable {
     
     @Override
     public InventoryTaskProgress getTaskProgress() {
-        return new InventoryTaskProgress(position);
+        return new InventoryTaskProgress(position.get());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index a13e04f5b64..bec2dbd84c0 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -29,7 +29,6 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Promise;
-import lombok.AllArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
@@ -61,6 +60,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * MySQL Connector.
@@ -297,10 +297,13 @@ public final class MySQLClient {
         }
     }
     
-    @AllArgsConstructor
     private final class MySQLBinlogEventHandler extends 
ChannelInboundHandlerAdapter {
         
-        private volatile AbstractBinlogEvent lastBinlogEvent;
+        private final AtomicReference<AbstractBinlogEvent> lastBinlogEvent;
+        
+        MySQLBinlogEventHandler(final AbstractBinlogEvent lastBinlogEvent) {
+            this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
+        }
         
         @Override
         public void channelRead(final ChannelHandlerContext ctx, final Object 
msg) throws Exception {
@@ -308,8 +311,8 @@ public final class MySQLClient {
                 return;
             }
             if (msg instanceof AbstractBinlogEvent) {
-                lastBinlogEvent = (AbstractBinlogEvent) msg;
-                blockingEventQueue.put(lastBinlogEvent);
+                lastBinlogEvent.set((AbstractBinlogEvent) msg);
+                blockingEventQueue.put(lastBinlogEvent.get());
                 reconnectTimes.set(0);
             }
         }
@@ -325,8 +328,8 @@ public final class MySQLClient {
         
         @Override
         public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) {
-            String fileName = null == lastBinlogEvent ? null : 
lastBinlogEvent.getFileName();
-            Long position = null == lastBinlogEvent ? null : 
lastBinlogEvent.getPosition();
+            String fileName = null == lastBinlogEvent.get() ? null : 
lastBinlogEvent.get().getFileName();
+            Long position = null == lastBinlogEvent.get() ? null : 
lastBinlogEvent.get().getPosition();
             log.error("MySQLBinlogEventHandler protocol resolution error, file 
name:{}, position:{}", fileName, position, cause);
         }
         
@@ -339,7 +342,7 @@ public final class MySQLClient {
             reconnectTimes.incrementAndGet();
             connect();
             log.info("reconnect times {}", reconnectTimes.get());
-            subscribe(lastBinlogEvent.getFileName(), 
lastBinlogEvent.getPosition());
+            subscribe(lastBinlogEvent.get().getFileName(), 
lastBinlogEvent.get().getPosition());
         }
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerInfo.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerInfo.java
index 07fcdf17605..4c6e5b799c7 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerInfo.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerInfo.java
@@ -18,14 +18,14 @@
 package org.apache.shardingsphere.data.pipeline.mysql.ingest.client;
 
 import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
 
 /**
  * MySQL server info.
  */
+@RequiredArgsConstructor
 @Getter
-@Setter
 public final class ServerInfo {
     
-    private volatile ServerVersion serverVersion;
+    private final ServerVersion serverVersion;
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerVersion.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerVersion.java
index 16285950169..0dd0d55741f 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerVersion.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerVersion.java
@@ -24,7 +24,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 /**
- * Server Version.
+ * Server version.
  */
 @Getter
 @Slf4j
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLCommandPacketDecoder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLCommandPacketDecoder.java
index fec27a02dc0..d122f96891f 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLCommandPacketDecoder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLCommandPacketDecoder.java
@@ -31,19 +31,16 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * MySQL Command Packet decoder.
+ * MySQL command packet decoder.
  */
 public final class MySQLCommandPacketDecoder extends ByteToMessageDecoder {
     
-    private enum States {
-        RESPONSE_PACKET, FIELD_PACKET, ROW_DATA_PACKET
-    }
-    
-    private volatile States currentState = States.RESPONSE_PACKET;
+    private final AtomicReference<States> currentState = new 
AtomicReference<>(States.RESPONSE_PACKET);
     
-    private volatile InternalResultSet internalResultSet;
+    private final AtomicReference<InternalResultSet> internalResultSet = new 
AtomicReference<>();
     
     @Override
     protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, 
final List<Object> out) {
@@ -52,11 +49,11 @@ public final class MySQLCommandPacketDecoder extends 
ByteToMessageDecoder {
     }
     
     private void decodeCommandPacket(final MySQLPacketPayload payload, final 
List<Object> out) {
-        if (States.FIELD_PACKET == currentState) {
+        if (States.FIELD_PACKET == currentState.get()) {
             decodeFieldPacket(payload);
             return;
         }
-        if (States.ROW_DATA_PACKET == currentState) {
+        if (States.ROW_DATA_PACKET == currentState.get()) {
             decodeRowDataPacket(payload, out);
             return;
         }
@@ -66,20 +63,20 @@ public final class MySQLCommandPacketDecoder extends 
ByteToMessageDecoder {
     private void decodeFieldPacket(final MySQLPacketPayload payload) {
         if (MySQLEofPacket.HEADER == (payload.getByteBuf().getByte(0) & 0xff)) 
{
             new MySQLEofPacket(payload);
-            currentState = States.ROW_DATA_PACKET;
+            currentState.set(States.ROW_DATA_PACKET);
         } else {
-            internalResultSet.getFieldDescriptors().add(new 
MySQLColumnDefinition41Packet(payload));
+            internalResultSet.get().getFieldDescriptors().add(new 
MySQLColumnDefinition41Packet(payload));
         }
     }
     
     private void decodeRowDataPacket(final MySQLPacketPayload payload, final 
List<Object> out) {
         if (MySQLEofPacket.HEADER == (payload.getByteBuf().getByte(0) & 0xff)) 
{
             new MySQLEofPacket(payload);
-            out.add(internalResultSet);
-            currentState = States.RESPONSE_PACKET;
-            internalResultSet = null;
+            out.add(internalResultSet.get());
+            currentState.set(States.RESPONSE_PACKET);
+            internalResultSet.set(null);
         } else {
-            internalResultSet.getFieldValues().add(new 
MySQLTextResultSetRowPacket(payload, 
internalResultSet.getHeader().getColumnCount()));
+            internalResultSet.get().getFieldValues().add(new 
MySQLTextResultSetRowPacket(payload, 
internalResultSet.get().getHeader().getColumnCount()));
         }
     }
     
@@ -93,9 +90,14 @@ public final class MySQLCommandPacketDecoder extends 
ByteToMessageDecoder {
                 break;
             default:
                 MySQLFieldCountPacket fieldCountPacket = new 
MySQLFieldCountPacket(payload);
-                currentState = States.FIELD_PACKET;
-                internalResultSet = new InternalResultSet(fieldCountPacket);
+                currentState.set(States.FIELD_PACKET);
+                internalResultSet.set(new InternalResultSet(fieldCountPacket));
                 break;
         }
     }
+    
+    private enum States {
+        
+        RESPONSE_PACKET, FIELD_PACKET, ROW_DATA_PACKET
+    }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandler.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandler.java
index 780b8f4a21e..216e686444a 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandler.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandler.java
@@ -40,7 +40,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLHandsha
 import java.security.NoSuchAlgorithmException;
 
 /**
- * MySQL Negotiate Handler.
+ * MySQL negotiate handler.
  */
 @RequiredArgsConstructor
 public final class MySQLNegotiateHandler extends ChannelInboundHandlerAdapter {
@@ -75,8 +75,7 @@ public final class MySQLNegotiateHandler extends 
ChannelInboundHandlerAdapter {
             
handshakeResponsePacket.setCapabilityFlags(generateClientCapability());
             
handshakeResponsePacket.setAuthPluginName(MySQLAuthenticationMethod.NATIVE);
             ctx.channel().writeAndFlush(handshakeResponsePacket);
-            serverInfo = new ServerInfo();
-            serverInfo.setServerVersion(new 
ServerVersion(handshake.getServerVersion()));
+            serverInfo = new ServerInfo(new 
ServerVersion(handshake.getServerVersion()));
             return;
         }
         if (msg instanceof MySQLAuthSwitchRequestPacket) {
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
index ae56e0983c3..dcc3972be7d 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
@@ -84,7 +84,7 @@ class MySQLClientTest {
     
     @Test
     void assertConnect() throws ReflectiveOperationException {
-        ServerInfo expected = new ServerInfo();
+        ServerInfo expected = new ServerInfo(new ServerVersion("5.5.0-log"));
         mockChannelResponse(expected);
         mysqlClient.connect();
         ServerInfo actual = (ServerInfo) 
Plugins.getMemberAccessor().get(MySQLClient.class.getDeclaredField("serverInfo"),
 mysqlClient);
@@ -123,8 +123,7 @@ class MySQLClientTest {
     
     @Test
     void assertSubscribeBelow56Version() throws ReflectiveOperationException {
-        ServerInfo serverInfo = new ServerInfo();
-        serverInfo.setServerVersion(new ServerVersion("5.5.0-log"));
+        ServerInfo serverInfo = new ServerInfo(new ServerVersion("5.5.0-log"));
         
Plugins.getMemberAccessor().set(MySQLClient.class.getDeclaredField("serverInfo"),
 mysqlClient, serverInfo);
         
Plugins.getMemberAccessor().set(MySQLClient.class.getDeclaredField("channel"), 
mysqlClient, channel);
         
Plugins.getMemberAccessor().set(MySQLClient.class.getDeclaredField("eventLoopGroup"),
 mysqlClient, new NioEventLoopGroup(1));
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandlerTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandlerTest.java
index e37ed9672f6..053b28c5893 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandlerTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandlerTest.java
@@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
 import io.netty.util.concurrent.Promise;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ServerInfo;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ServerVersion;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLAuthenticationMethod;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
@@ -91,7 +92,7 @@ class MySQLNegotiateHandlerTest {
     @Test
     void assertChannelReadOkPacket() throws ReflectiveOperationException {
         MySQLOKPacket okPacket = new MySQLOKPacket(0);
-        ServerInfo serverInfo = new ServerInfo();
+        ServerInfo serverInfo = new ServerInfo(new ServerVersion("5.5.0-log"));
         
Plugins.getMemberAccessor().set(MySQLNegotiateHandler.class.getDeclaredField("serverInfo"),
 mysqlNegotiateHandler, serverInfo);
         mysqlNegotiateHandler.channelRead(channelHandlerContext, okPacket);
         verify(pipeline).remove(mysqlNegotiateHandler);

Reply via email to