[
https://issues.apache.org/jira/browse/KAFKA-7712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719303#comment-16719303
]
ASF GitHub Bot commented on KAFKA-7712:
---------------------------------------
hachikuji closed pull request #6023: KAFKA-7712: Remove channel from Selector
before propagating exception
URL: https://github.com/apache/kafka/pull/6023
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 843d46dc736..8c46746d847 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -250,10 +250,11 @@ public Selector(long connectionMaxIdleMS, int
failedAuthenticationDelayMs, Metri
public void connect(String id, InetSocketAddress address, int
sendBufferSize, int receiveBufferSize) throws IOException {
ensureNotRegistered(id);
SocketChannel socketChannel = SocketChannel.open();
+ SelectionKey key = null;
try {
configureSocketChannel(socketChannel, sendBufferSize,
receiveBufferSize);
boolean connected = doConnect(socketChannel, address);
- SelectionKey key = registerChannel(id, socketChannel,
SelectionKey.OP_CONNECT);
+ key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
@@ -262,6 +263,9 @@ public void connect(String id, InetSocketAddress address,
int sendBufferSize, in
key.interestOps(0);
}
} catch (IOException | RuntimeException e) {
+ if (key != null)
+ immediatelyConnectedKeys.remove(key);
+ channels.remove(id);
socketChannel.close();
throw e;
}
@@ -316,7 +320,7 @@ private void ensureNotRegistered(String id) {
throw new IllegalStateException("There is already a connection for
id " + id + " that is still being closed");
}
- private SelectionKey registerChannel(String id, SocketChannel
socketChannel, int interestedOps) throws IOException {
+ protected SelectionKey registerChannel(String id, SocketChannel
socketChannel, int interestedOps) throws IOException {
SelectionKey key = socketChannel.register(nioSelector, interestedOps);
KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id,
key);
this.channels.put(id, channel);
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 6cf75861122..2da1cc68198 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -51,6 +51,7 @@
import java.util.Random;
import java.util.Set;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -380,16 +381,7 @@ public void testIdleExpiryWithoutReadyKeys() throws
IOException {
@Test
public void testImmediatelyConnectedCleaned() throws Exception {
Metrics metrics = new Metrics(); // new metrics object to avoid metric
registration conflicts
- Selector selector = new Selector(5000, metrics, time, "MetricGroup",
channelBuilder, new LogContext()) {
- @Override
- protected boolean doConnect(SocketChannel channel,
InetSocketAddress address) throws IOException {
- // Use a blocking connect to trigger the immediately connected
path
- channel.configureBlocking(true);
- boolean connected = super.doConnect(channel, address);
- channel.configureBlocking(false);
- return connected;
- }
- };
+ Selector selector = new ImmediatelyConnectingSelector(5000, metrics,
time, "MetricGroup", channelBuilder, new LogContext());
try {
testImmediatelyConnectedCleaned(selector, true);
@@ -400,6 +392,26 @@ protected boolean doConnect(SocketChannel channel,
InetSocketAddress address) th
}
}
+ private static class ImmediatelyConnectingSelector extends Selector {
+ public ImmediatelyConnectingSelector(long connectionMaxIdleMS,
+ Metrics metrics,
+ Time time,
+ String metricGrpPrefix,
+ ChannelBuilder channelBuilder,
+ LogContext logContext) {
+ super(connectionMaxIdleMS, metrics, time, metricGrpPrefix,
channelBuilder, logContext);
+ }
+
+ @Override
+ protected boolean doConnect(SocketChannel channel, InetSocketAddress
address) throws IOException {
+ // Use a blocking connect to trigger the immediately connected path
+ channel.configureBlocking(true);
+ boolean connected = super.doConnect(channel, address);
+ channel.configureBlocking(false);
+ return connected;
+ }
+ }
+
private void testImmediatelyConnectedCleaned(Selector selector, boolean
closeAfterFirstPoll) throws Exception {
String id = "0";
selector.connect(id, new InetSocketAddress("localhost", server.port),
BUFFER_SIZE, BUFFER_SIZE);
@@ -412,6 +424,46 @@ private void testImmediatelyConnectedCleaned(Selector
selector, boolean closeAft
verifySelectorEmpty(selector);
}
+ /**
+ * Verify that if Selector#connect fails and throws an Exception, all
related objects
+ * are cleared immediately before the exception is propagated.
+ */
+ @Test
+ public void testConnectException() throws Exception {
+ Metrics metrics = new Metrics();
+ AtomicBoolean throwIOException = new AtomicBoolean();
+ Selector selector = new ImmediatelyConnectingSelector(5000, metrics,
time, "MetricGroup", channelBuilder, new LogContext()) {
+ @Override
+ protected SelectionKey registerChannel(String id, SocketChannel
socketChannel, int interestedOps) throws IOException {
+ SelectionKey key = super.registerChannel(id, socketChannel,
interestedOps);
+ key.cancel();
+ if (throwIOException.get())
+ throw new IOException("Test exception");
+ return key;
+ }
+ };
+
+ try {
+ verifyImmediatelyConnectedException(selector, "0");
+ throwIOException.set(true);
+ verifyImmediatelyConnectedException(selector, "1");
+ } finally {
+ selector.close();
+ metrics.close();
+ }
+ }
+
+ private void verifyImmediatelyConnectedException(Selector selector, String
id) throws Exception {
+ try {
+ selector.connect(id, new InetSocketAddress("localhost",
server.port), BUFFER_SIZE, BUFFER_SIZE);
+ fail("Expected exception not thrown");
+ } catch (Exception e) {
+ verifyEmptyImmediatelyConnectedKeys(selector);
+ assertNull("Channel not removed", selector.channel(id));
+ ensureEmptySelectorFields(selector);
+ }
+ }
+
@Test
public void testCloseOldestConnectionWithOneStagedReceive() throws
Exception {
verifyCloseOldestConnectionWithStagedReceives(1);
@@ -715,6 +767,10 @@ private void verifySelectorEmpty(Selector selector) throws
Exception {
}
selector.poll(0);
selector.poll(0); // Poll a second time to clear everything
+ ensureEmptySelectorFields(selector);
+ }
+
+ private void ensureEmptySelectorFields(Selector selector) throws Exception
{
for (Field field : Selector.class.getDeclaredFields()) {
ensureEmptySelectorField(selector, field);
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Handle exceptions from immediately connected channels in Selector
> -----------------------------------------------------------------
>
> Key: KAFKA-7712
> URL: https://issues.apache.org/jira/browse/KAFKA-7712
> Project: Kafka
> Issue Type: Bug
> Components: network
> Reporter: Rajini Sivaram
> Assignee: Rajini Sivaram
> Priority: Major
> Fix For: 2.2.0
>
>
> We try to handle all possible exceptions in Selector to ensure that channels
> are always closed and their states kept consistent. For immediately connected
> channels, we should ensure that any exception during connection results in
> the channel being closed properly and removed from all maps. This is a very
> unlikely scenario, but we do already handle the exception. We should clean up
> properly in the catch block.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)