Copilot commented on code in PR #8020:
URL: https://github.com/apache/incubator-seata/pull/8020#discussion_r2957994657
##########
core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java:
##########
@@ -362,6 +364,138 @@ public void testGetPoolKeyFunction() throws Exception {
assertNotNull(function);
}
+ @Test
+ public void unregisterResourceWithBlankParamsTest() {
+ RmNettyRemotingClient client =
RmNettyRemotingClient.getInstance("app", "test_group");
+
+ NettyClientChannelManager channelManager =
mock(NettyClientChannelManager.class);
+
+ client.setTransactionServiceGroup(null);
+ client.unregisterResource("group1",
"jdbc:mysql://localhost:3306/test");
+ verify(channelManager, never()).getChannels();
+
+ client.setTransactionServiceGroup("test_group");
+ client.unregisterResource("group1", "");
+ verify(channelManager, never()).getChannels();
+
+ client.unregisterResource("group1", null);
+ verify(channelManager, never()).getChannels();
+ }
Review Comment:
This test creates a `channelManager` mock but never injects it into the
client, so the `verify(channelManager, never()).getChannels()` assertions are
effectively checking an unused mock. Inject the mock via
`setChannelManager(client, channelManager)` (as done in the other tests) and
then verify no interactions, otherwise this test doesn't validate the
blank-param short-circuit behavior.
##########
core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java:
##########
@@ -292,6 +295,50 @@ public void sendRegisterMessage(String serverAddress,
Channel channel, String re
}
}
+ public void unregisterResource(String resourceGroupId, String resourceId) {
+ if (StringUtils.isBlank(transactionServiceGroup) ||
StringUtils.isBlank(resourceId)) {
+ return;
+ }
+ sendUnregisterToServers(resourceId);
+ }
Review Comment:
`resourceGroupId` is currently unused in this new API. If it’s intentionally
ignored (like `registerResource`), consider documenting that in Javadoc (or
remove the parameter if it’s not part of a required public signature) to avoid
confusion for callers.
##########
core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java:
##########
@@ -362,6 +364,138 @@ public void testGetPoolKeyFunction() throws Exception {
assertNotNull(function);
}
+ @Test
+ public void unregisterResourceWithBlankParamsTest() {
+ RmNettyRemotingClient client =
RmNettyRemotingClient.getInstance("app", "test_group");
+
+ NettyClientChannelManager channelManager =
mock(NettyClientChannelManager.class);
+
+ client.setTransactionServiceGroup(null);
+ client.unregisterResource("group1",
"jdbc:mysql://localhost:3306/test");
+ verify(channelManager, never()).getChannels();
+
+ client.setTransactionServiceGroup("test_group");
+ client.unregisterResource("group1", "");
+ verify(channelManager, never()).getChannels();
+
+ client.unregisterResource("group1", null);
+ verify(channelManager, never()).getChannels();
+ }
+
+ @Test
+ public void unregisterResourceWithActiveChannelTest() throws Exception {
+ RmNettyRemotingClient client =
RmNettyRemotingClient.getInstance("app", "test_group");
+
+ Channel channel = mock(Channel.class);
+ when(channel.isActive()).thenReturn(true);
+
+ String serverAddress = "127.0.0.1:8091";
+ ConcurrentHashMap<String, Channel> channels = new
ConcurrentHashMap<>();
+ channels.put(serverAddress, channel);
+
+ NettyClientChannelManager channelManager =
mock(NettyClientChannelManager.class);
+ when(channelManager.getChannels()).thenReturn(channels);
+ setChannelManager(client, channelManager);
+
+ Version.putServerVersion(serverAddress, "2.6.0");
+
+ RmNettyRemotingClient spyClient = Mockito.spy(client);
+
+ spyClient.unregisterResource("group1",
"jdbc:mysql://localhost:3306/test");
+
+ verify(spyClient).sendAsyncRequest(eq(channel),
any(UnregisterRMRequest.class));
+
+ Version.SERVER_VERSION_MAP.remove(serverAddress);
+ }
+
+ @Test
+ public void unregisterResourceSkipsOldServerVersionTest() throws Exception
{
+ RmNettyRemotingClient client =
RmNettyRemotingClient.getInstance("app", "test_group");
+
+ Channel channel = mock(Channel.class);
+ when(channel.isActive()).thenReturn(true);
+
+ String serverAddress = "127.0.0.1:8091";
+ ConcurrentHashMap<String, Channel> channels = new
ConcurrentHashMap<>();
+ channels.put(serverAddress, channel);
+
+ NettyClientChannelManager channelManager =
mock(NettyClientChannelManager.class);
+ when(channelManager.getChannels()).thenReturn(channels);
+ setChannelManager(client, channelManager);
+
+ Version.putServerVersion(serverAddress, "2.5.0");
+
+ RmNettyRemotingClient spyClient = Mockito.spy(client);
+
+ spyClient.unregisterResource("group1",
"jdbc:mysql://localhost:3306/test");
+
+ verify(spyClient, never()).sendAsyncRequest(any(Channel.class),
any(UnregisterRMRequest.class));
+
+ Version.SERVER_VERSION_MAP.remove(serverAddress);
+ }
+
+ @Test
+ public void unregisterResourceSkipsInactiveChannelTest() throws Exception {
+ RmNettyRemotingClient client =
RmNettyRemotingClient.getInstance("app", "test_group");
+
+ Channel channel = mock(Channel.class);
+ when(channel.isActive()).thenReturn(false);
+
+ String serverAddress = "127.0.0.1:8091";
+ ConcurrentHashMap<String, Channel> channels = new
ConcurrentHashMap<>();
+ channels.put(serverAddress, channel);
+
+ NettyClientChannelManager channelManager =
mock(NettyClientChannelManager.class);
+ when(channelManager.getChannels()).thenReturn(channels);
+ setChannelManager(client, channelManager);
+
+ RmNettyRemotingClient spyClient = Mockito.spy(client);
+
+ spyClient.unregisterResource("group1",
"jdbc:mysql://localhost:3306/test");
+
+ verify(spyClient, never()).sendAsyncRequest(any(Channel.class),
any(UnregisterRMRequest.class));
+ }
+
+ @Test
+ public void unregisterResourceHandlesChannelNotWritableTest() throws
Exception {
+ RmNettyRemotingClient client =
RmNettyRemotingClient.getInstance("app", "test_group");
+
+ Channel channel = mock(Channel.class);
+ when(channel.isActive()).thenReturn(true);
+
+ String serverAddress = "127.0.0.1:8091";
+ ConcurrentHashMap<String, Channel> channels = new
ConcurrentHashMap<>();
+ channels.put(serverAddress, channel);
+
+ NettyClientChannelManager channelManager =
mock(NettyClientChannelManager.class);
+ when(channelManager.getChannels()).thenReturn(channels);
+ setChannelManager(client, channelManager);
+
+ Version.putServerVersion(serverAddress, "2.6.0");
+
+ RmNettyRemotingClient spyClient = Mockito.spy(client);
+ doThrow(new FrameworkException("Channel is not writable",
FrameworkErrorCode.ChannelIsNotWritable))
+ .when(spyClient)
+ .sendAsyncRequest(eq(channel), any(UnregisterRMRequest.class));
+
+ spyClient.unregisterResource("group1",
"jdbc:mysql://localhost:3306/test");
+
+ verify(channelManager).releaseChannel(eq(channel), eq(serverAddress));
+
+ Version.SERVER_VERSION_MAP.remove(serverAddress);
Review Comment:
The tests mutate `Version.SERVER_VERSION_MAP` directly. Since `Version` now
exposes `removeServerVersion(...)`, prefer using that helper to avoid coupling
tests to the internal map representation (and to keep usage consistent with the
new API).
##########
core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java:
##########
@@ -362,6 +364,138 @@ public void testGetPoolKeyFunction() throws Exception {
assertNotNull(function);
}
+ @Test
+ public void unregisterResourceWithBlankParamsTest() {
+ RmNettyRemotingClient client =
RmNettyRemotingClient.getInstance("app", "test_group");
+
+ NettyClientChannelManager channelManager =
mock(NettyClientChannelManager.class);
+
+ client.setTransactionServiceGroup(null);
+ client.unregisterResource("group1",
"jdbc:mysql://localhost:3306/test");
+ verify(channelManager, never()).getChannels();
+
+ client.setTransactionServiceGroup("test_group");
+ client.unregisterResource("group1", "");
+ verify(channelManager, never()).getChannels();
+
+ client.unregisterResource("group1", null);
+ verify(channelManager, never()).getChannels();
+ }
+
+ @Test
+ public void unregisterResourceWithActiveChannelTest() throws Exception {
+ RmNettyRemotingClient client =
RmNettyRemotingClient.getInstance("app", "test_group");
+
+ Channel channel = mock(Channel.class);
+ when(channel.isActive()).thenReturn(true);
+
+ String serverAddress = "127.0.0.1:8091";
+ ConcurrentHashMap<String, Channel> channels = new
ConcurrentHashMap<>();
+ channels.put(serverAddress, channel);
+
+ NettyClientChannelManager channelManager =
mock(NettyClientChannelManager.class);
+ when(channelManager.getChannels()).thenReturn(channels);
+ setChannelManager(client, channelManager);
+
+ Version.putServerVersion(serverAddress, "2.6.0");
+
+ RmNettyRemotingClient spyClient = Mockito.spy(client);
+
+ spyClient.unregisterResource("group1",
"jdbc:mysql://localhost:3306/test");
+
+ verify(spyClient).sendAsyncRequest(eq(channel),
any(UnregisterRMRequest.class));
+
+ Version.SERVER_VERSION_MAP.remove(serverAddress);
+ }
Review Comment:
The tests mutate `Version.SERVER_VERSION_MAP` directly. Since `Version` now
exposes `removeServerVersion(...)`, prefer using that helper to avoid coupling
tests to the internal map representation (and to keep usage consistent with the
new API).
##########
core/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java:
##########
@@ -362,6 +364,138 @@ public void testGetPoolKeyFunction() throws Exception {
assertNotNull(function);
}
+ @Test
+ public void unregisterResourceWithBlankParamsTest() {
+ RmNettyRemotingClient client =
RmNettyRemotingClient.getInstance("app", "test_group");
+
+ NettyClientChannelManager channelManager =
mock(NettyClientChannelManager.class);
+
+ client.setTransactionServiceGroup(null);
+ client.unregisterResource("group1",
"jdbc:mysql://localhost:3306/test");
+ verify(channelManager, never()).getChannels();
+
+ client.setTransactionServiceGroup("test_group");
+ client.unregisterResource("group1", "");
+ verify(channelManager, never()).getChannels();
+
+ client.unregisterResource("group1", null);
+ verify(channelManager, never()).getChannels();
+ }
+
+ @Test
+ public void unregisterResourceWithActiveChannelTest() throws Exception {
+ RmNettyRemotingClient client =
RmNettyRemotingClient.getInstance("app", "test_group");
+
+ Channel channel = mock(Channel.class);
+ when(channel.isActive()).thenReturn(true);
+
+ String serverAddress = "127.0.0.1:8091";
+ ConcurrentHashMap<String, Channel> channels = new
ConcurrentHashMap<>();
+ channels.put(serverAddress, channel);
+
+ NettyClientChannelManager channelManager =
mock(NettyClientChannelManager.class);
+ when(channelManager.getChannels()).thenReturn(channels);
+ setChannelManager(client, channelManager);
+
+ Version.putServerVersion(serverAddress, "2.6.0");
+
+ RmNettyRemotingClient spyClient = Mockito.spy(client);
+
+ spyClient.unregisterResource("group1",
"jdbc:mysql://localhost:3306/test");
+
+ verify(spyClient).sendAsyncRequest(eq(channel),
any(UnregisterRMRequest.class));
+
+ Version.SERVER_VERSION_MAP.remove(serverAddress);
+ }
+
+ @Test
+ public void unregisterResourceSkipsOldServerVersionTest() throws Exception
{
+ RmNettyRemotingClient client =
RmNettyRemotingClient.getInstance("app", "test_group");
+
+ Channel channel = mock(Channel.class);
+ when(channel.isActive()).thenReturn(true);
+
+ String serverAddress = "127.0.0.1:8091";
+ ConcurrentHashMap<String, Channel> channels = new
ConcurrentHashMap<>();
+ channels.put(serverAddress, channel);
+
+ NettyClientChannelManager channelManager =
mock(NettyClientChannelManager.class);
+ when(channelManager.getChannels()).thenReturn(channels);
+ setChannelManager(client, channelManager);
+
+ Version.putServerVersion(serverAddress, "2.5.0");
+
+ RmNettyRemotingClient spyClient = Mockito.spy(client);
+
+ spyClient.unregisterResource("group1",
"jdbc:mysql://localhost:3306/test");
+
+ verify(spyClient, never()).sendAsyncRequest(any(Channel.class),
any(UnregisterRMRequest.class));
+
+ Version.SERVER_VERSION_MAP.remove(serverAddress);
+ }
Review Comment:
The tests mutate `Version.SERVER_VERSION_MAP` directly. Since `Version` now
exposes `removeServerVersion(...)`, prefer using that helper to avoid coupling
tests to the internal map representation (and to keep usage consistent with the
new API).
##########
core/src/main/java/org/apache/seata/core/rpc/processor/server/UnregRmProcessor.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.processor.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.seata.common.Constants;
+import org.apache.seata.common.util.NetUtil;
+import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.protocol.UnregisterRMRequest;
+import org.apache.seata.core.protocol.UnregisterRMResponse;
+import org.apache.seata.core.rpc.RemotingServer;
+import org.apache.seata.core.rpc.netty.ChannelManager;
+import org.apache.seata.core.rpc.processor.RemotingProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Process RM client unregister message.
+ * <p>
+ * process message type:
+ * {@link UnregisterRMRequest}
+ */
+public class UnregRmProcessor implements RemotingProcessor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(UnregRmProcessor.class);
+
+ private RemotingServer remotingServer;
+
+ public UnregRmProcessor(RemotingServer remotingServer) {
+ this.remotingServer = remotingServer;
+ }
+
+ @Override
+ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage)
throws Exception {
+ UnregisterRMRequest message = (UnregisterRMRequest)
rpcMessage.getBody();
+ String ipAndPort =
NetUtil.toStringAddress(ctx.channel().remoteAddress());
+ boolean isSuccess = false;
+ try {
+ String resourceIdStr = message.getResourceIds();
+ if (StringUtils.isBlank(resourceIdStr)) {
+ LOGGER.warn("RM unregister request has empty resourceIds,
client:{}", ipAndPort);
+ UnregisterRMResponse response = new
UnregisterRMResponse(false);
+ remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(),
response);
+ return;
+ }
+ Set<String> resourceIdSet = new
HashSet<>(Arrays.asList(resourceIdStr.split(Constants.DBKEYS_SPLIT_CHAR)));
+ ChannelManager.unregisterRMChannel(ctx.channel(), resourceIdSet);
+ isSuccess = true;
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("RM unregister success, message:{}, channel:{}",
message, ctx.channel());
+ }
+ } catch (Exception exx) {
+ LOGGER.error("RM unregister fail, client:{}, error message:{}",
ipAndPort, exx.getMessage());
Review Comment:
The error log in the exception path drops the stack trace by only logging
`exx.getMessage()`. This makes production debugging of unregister failures
difficult; pass the exception as the last argument to the logger (and consider
setting an error msg in the response for client visibility).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]