This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8dd163a80e1 Fix sonar issue of MySQLBinlogEventHandler (#25644)
8dd163a80e1 is described below
commit 8dd163a80e153d252f426d6abda5ec6bae580eb8
Author: Liang Zhang <[email protected]>
AuthorDate: Sat May 13 21:12:10 2023 +0800
Fix sonar issue of MySQLBinlogEventHandler (#25644)
* Fix sonar issue of InventoryTask
* Fix sonar issue of MySQLBinlogEventHandler
* Fix sonar issue of ServerInfo
* Fix sonar issue of ServerInfo
* Fix sonar issue of MySQLCommandPacketDecoder
* Fix sonar issue of MySQLCommandPacketDecoder
* Fix sonar issue of MySQLNegotiateHandler
---
.../data/pipeline/core/task/InventoryTask.java | 13 ++++----
.../pipeline/mysql/ingest/client/MySQLClient.java | 19 +++++++-----
.../pipeline/mysql/ingest/client/ServerInfo.java | 6 ++--
.../mysql/ingest/client/ServerVersion.java | 2 +-
.../client/netty/MySQLCommandPacketDecoder.java | 36 ++++++++++++----------
.../ingest/client/netty/MySQLNegotiateHandler.java | 5 ++-
.../mysql/ingest/client/MySQLClientTest.java | 5 ++-
.../client/netty/MySQLNegotiateHandlerTest.java | 3 +-
8 files changed, 47 insertions(+), 42 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 2914a87f655..35007836e44 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -44,6 +44,7 @@ import javax.sql.DataSource;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Inventory task.
@@ -65,7 +66,7 @@ public final class InventoryTask implements PipelineTask,
AutoCloseable {
private final Importer importer;
- private volatile IngestPosition<?> position;
+ private final AtomicReference<IngestPosition<?>> position;
public InventoryTask(final InventoryDumperConfiguration
inventoryDumperConfig, final ImporterConfiguration importerConfig,
final PipelineChannelCreator pipelineChannelCreator,
final ImporterConnector importerConnector,
@@ -77,9 +78,9 @@ public final class InventoryTask implements PipelineTask,
AutoCloseable {
this.inventoryImporterExecuteEngine = inventoryImporterExecuteEngine;
channel = createChannel(pipelineChannelCreator);
dumper = new InventoryDumper(inventoryDumperConfig, channel,
sourceDataSource, sourceMetaDataLoader);
- importer = TypedSPILoader.getService(ImporterCreator.class,
importerConnector.getType()).createImporter(importerConfig, importerConnector,
channel, jobProgressListener,
- ImporterType.INVENTORY);
- position = inventoryDumperConfig.getPosition();
+ importer = TypedSPILoader.getService(ImporterCreator.class,
+ importerConnector.getType()).createImporter(importerConfig,
importerConnector, channel, jobProgressListener, ImporterType.INVENTORY);
+ position = new AtomicReference<>(inventoryDumperConfig.getPosition());
}
private String generateTaskId(final InventoryDumperConfiguration
inventoryDumperConfig) {
@@ -123,7 +124,7 @@ public final class InventoryTask implements PipelineTask,
AutoCloseable {
return pipelineChannelCreator.createPipelineChannel(1, records -> {
Record lastNormalRecord = RecordUtils.getLastNormalRecord(records);
if (null != lastNormalRecord) {
- position = lastNormalRecord.getPosition();
+ position.set(lastNormalRecord.getPosition());
}
});
}
@@ -136,7 +137,7 @@ public final class InventoryTask implements PipelineTask,
AutoCloseable {
@Override
public InventoryTaskProgress getTaskProgress() {
- return new InventoryTaskProgress(position);
+ return new InventoryTaskProgress(position.get());
}
@Override
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index a13e04f5b64..bec2dbd84c0 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -29,7 +29,6 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
-import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
@@ -61,6 +60,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
/**
* MySQL Connector.
@@ -297,10 +297,13 @@ public final class MySQLClient {
}
}
- @AllArgsConstructor
private final class MySQLBinlogEventHandler extends
ChannelInboundHandlerAdapter {
- private volatile AbstractBinlogEvent lastBinlogEvent;
+ private final AtomicReference<AbstractBinlogEvent> lastBinlogEvent;
+
+ MySQLBinlogEventHandler(final AbstractBinlogEvent lastBinlogEvent) {
+ this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
+ }
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object
msg) throws Exception {
@@ -308,8 +311,8 @@ public final class MySQLClient {
return;
}
if (msg instanceof AbstractBinlogEvent) {
- lastBinlogEvent = (AbstractBinlogEvent) msg;
- blockingEventQueue.put(lastBinlogEvent);
+ lastBinlogEvent.set((AbstractBinlogEvent) msg);
+ blockingEventQueue.put(lastBinlogEvent.get());
reconnectTimes.set(0);
}
}
@@ -325,8 +328,8 @@ public final class MySQLClient {
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final
Throwable cause) {
- String fileName = null == lastBinlogEvent ? null :
lastBinlogEvent.getFileName();
- Long position = null == lastBinlogEvent ? null :
lastBinlogEvent.getPosition();
+ String fileName = null == lastBinlogEvent.get() ? null :
lastBinlogEvent.get().getFileName();
+ Long position = null == lastBinlogEvent.get() ? null :
lastBinlogEvent.get().getPosition();
log.error("MySQLBinlogEventHandler protocol resolution error, file
name:{}, position:{}", fileName, position, cause);
}
@@ -339,7 +342,7 @@ public final class MySQLClient {
reconnectTimes.incrementAndGet();
connect();
log.info("reconnect times {}", reconnectTimes.get());
- subscribe(lastBinlogEvent.getFileName(),
lastBinlogEvent.getPosition());
+ subscribe(lastBinlogEvent.get().getFileName(),
lastBinlogEvent.get().getPosition());
}
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerInfo.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerInfo.java
index 07fcdf17605..4c6e5b799c7 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerInfo.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerInfo.java
@@ -18,14 +18,14 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.client;
import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
/**
* MySQL server info.
*/
+@RequiredArgsConstructor
@Getter
-@Setter
public final class ServerInfo {
- private volatile ServerVersion serverVersion;
+ private final ServerVersion serverVersion;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerVersion.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerVersion.java
index 16285950169..0dd0d55741f 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerVersion.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/ServerVersion.java
@@ -24,7 +24,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
- * Server Version.
+ * Server version.
*/
@Getter
@Slf4j
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLCommandPacketDecoder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLCommandPacketDecoder.java
index fec27a02dc0..d122f96891f 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLCommandPacketDecoder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLCommandPacketDecoder.java
@@ -31,19 +31,16 @@ import
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
/**
- * MySQL Command Packet decoder.
+ * MySQL command packet decoder.
*/
public final class MySQLCommandPacketDecoder extends ByteToMessageDecoder {
- private enum States {
- RESPONSE_PACKET, FIELD_PACKET, ROW_DATA_PACKET
- }
-
- private volatile States currentState = States.RESPONSE_PACKET;
+ private final AtomicReference<States> currentState = new
AtomicReference<>(States.RESPONSE_PACKET);
- private volatile InternalResultSet internalResultSet;
+ private final AtomicReference<InternalResultSet> internalResultSet = new
AtomicReference<>();
@Override
protected void decode(final ChannelHandlerContext ctx, final ByteBuf in,
final List<Object> out) {
@@ -52,11 +49,11 @@ public final class MySQLCommandPacketDecoder extends
ByteToMessageDecoder {
}
private void decodeCommandPacket(final MySQLPacketPayload payload, final
List<Object> out) {
- if (States.FIELD_PACKET == currentState) {
+ if (States.FIELD_PACKET == currentState.get()) {
decodeFieldPacket(payload);
return;
}
- if (States.ROW_DATA_PACKET == currentState) {
+ if (States.ROW_DATA_PACKET == currentState.get()) {
decodeRowDataPacket(payload, out);
return;
}
@@ -66,20 +63,20 @@ public final class MySQLCommandPacketDecoder extends
ByteToMessageDecoder {
private void decodeFieldPacket(final MySQLPacketPayload payload) {
if (MySQLEofPacket.HEADER == (payload.getByteBuf().getByte(0) & 0xff))
{
new MySQLEofPacket(payload);
- currentState = States.ROW_DATA_PACKET;
+ currentState.set(States.ROW_DATA_PACKET);
} else {
- internalResultSet.getFieldDescriptors().add(new
MySQLColumnDefinition41Packet(payload));
+ internalResultSet.get().getFieldDescriptors().add(new
MySQLColumnDefinition41Packet(payload));
}
}
private void decodeRowDataPacket(final MySQLPacketPayload payload, final
List<Object> out) {
if (MySQLEofPacket.HEADER == (payload.getByteBuf().getByte(0) & 0xff))
{
new MySQLEofPacket(payload);
- out.add(internalResultSet);
- currentState = States.RESPONSE_PACKET;
- internalResultSet = null;
+ out.add(internalResultSet.get());
+ currentState.set(States.RESPONSE_PACKET);
+ internalResultSet.set(null);
} else {
- internalResultSet.getFieldValues().add(new
MySQLTextResultSetRowPacket(payload,
internalResultSet.getHeader().getColumnCount()));
+ internalResultSet.get().getFieldValues().add(new
MySQLTextResultSetRowPacket(payload,
internalResultSet.get().getHeader().getColumnCount()));
}
}
@@ -93,9 +90,14 @@ public final class MySQLCommandPacketDecoder extends
ByteToMessageDecoder {
break;
default:
MySQLFieldCountPacket fieldCountPacket = new
MySQLFieldCountPacket(payload);
- currentState = States.FIELD_PACKET;
- internalResultSet = new InternalResultSet(fieldCountPacket);
+ currentState.set(States.FIELD_PACKET);
+ internalResultSet.set(new InternalResultSet(fieldCountPacket));
break;
}
}
+
+ private enum States {
+
+ RESPONSE_PACKET, FIELD_PACKET, ROW_DATA_PACKET
+ }
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandler.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandler.java
index 780b8f4a21e..216e686444a 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandler.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandler.java
@@ -40,7 +40,7 @@ import
org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLHandsha
import java.security.NoSuchAlgorithmException;
/**
- * MySQL Negotiate Handler.
+ * MySQL negotiate handler.
*/
@RequiredArgsConstructor
public final class MySQLNegotiateHandler extends ChannelInboundHandlerAdapter {
@@ -75,8 +75,7 @@ public final class MySQLNegotiateHandler extends
ChannelInboundHandlerAdapter {
handshakeResponsePacket.setCapabilityFlags(generateClientCapability());
handshakeResponsePacket.setAuthPluginName(MySQLAuthenticationMethod.NATIVE);
ctx.channel().writeAndFlush(handshakeResponsePacket);
- serverInfo = new ServerInfo();
- serverInfo.setServerVersion(new
ServerVersion(handshake.getServerVersion()));
+ serverInfo = new ServerInfo(new
ServerVersion(handshake.getServerVersion()));
return;
}
if (msg instanceof MySQLAuthSwitchRequestPacket) {
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
index ae56e0983c3..dcc3972be7d 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
@@ -84,7 +84,7 @@ class MySQLClientTest {
@Test
void assertConnect() throws ReflectiveOperationException {
- ServerInfo expected = new ServerInfo();
+ ServerInfo expected = new ServerInfo(new ServerVersion("5.5.0-log"));
mockChannelResponse(expected);
mysqlClient.connect();
ServerInfo actual = (ServerInfo)
Plugins.getMemberAccessor().get(MySQLClient.class.getDeclaredField("serverInfo"),
mysqlClient);
@@ -123,8 +123,7 @@ class MySQLClientTest {
@Test
void assertSubscribeBelow56Version() throws ReflectiveOperationException {
- ServerInfo serverInfo = new ServerInfo();
- serverInfo.setServerVersion(new ServerVersion("5.5.0-log"));
+ ServerInfo serverInfo = new ServerInfo(new ServerVersion("5.5.0-log"));
Plugins.getMemberAccessor().set(MySQLClient.class.getDeclaredField("serverInfo"),
mysqlClient, serverInfo);
Plugins.getMemberAccessor().set(MySQLClient.class.getDeclaredField("channel"),
mysqlClient, channel);
Plugins.getMemberAccessor().set(MySQLClient.class.getDeclaredField("eventLoopGroup"),
mysqlClient, new NioEventLoopGroup(1));
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandlerTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandlerTest.java
index e37ed9672f6..053b28c5893 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandlerTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLNegotiateHandlerTest.java
@@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.util.concurrent.Promise;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ServerInfo;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ServerVersion;
import
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLAuthenticationMethod;
import
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
@@ -91,7 +92,7 @@ class MySQLNegotiateHandlerTest {
@Test
void assertChannelReadOkPacket() throws ReflectiveOperationException {
MySQLOKPacket okPacket = new MySQLOKPacket(0);
- ServerInfo serverInfo = new ServerInfo();
+ ServerInfo serverInfo = new ServerInfo(new ServerVersion("5.5.0-log"));
Plugins.getMemberAccessor().set(MySQLNegotiateHandler.class.getDeclaredField("serverInfo"),
mysqlNegotiateHandler, serverInfo);
mysqlNegotiateHandler.channelRead(channelHandlerContext, okPacket);
verify(pipeline).remove(mysqlNegotiateHandler);