[
https://issues.apache.org/jira/browse/KAFKA-16701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17869167#comment-17869167
]
bboyleonp commented on KAFKA-16701:
-----------------------------------
[~gharris1727] I have added custom logs to narrow down the issue and find the
following information
Log for test `closingChannelSendFailure` on JDK 17 ({color:#de350b}Fail{color})
{code:java}
[2024-07-28 19:46:31,751] DEBUG Leon: before receiveRequest (kafka:700)
[2024-07-28 19:46:31,751] DEBUG Leon: request obtained by callbackQueue is null
(kafka.request.logger:471)
[2024-07-28 19:46:31,751] DEBUG Leon: the connectionQuotas of inetAddress is 1
(kafka.network.SocketServerTest$TestableProcessor:62)
[2024-07-28 19:46:32,053] DEBUG Leon: the connectionQuotas of inetAddress is 0
(kafka.network.SocketServerTest$TestableProcessor:62)
[2024-07-28 19:46:32,053] ERROR Exception while processing disconnection of
127.0.0.1:51325-127.0.0.1:51327-0
(kafka.network.SocketServerTest$TestableProcessor:76)
java.lang.IllegalArgumentException: Attempted to decrease connection count for
address with no connections, address: /127.0.0.1
at
kafka.network.ConnectionQuotas.$anonfun$dec$1(SocketServer.scala:1535)
at scala.collection.mutable.HashMap.getOrElse(HashMap.scala:451)
at kafka.network.ConnectionQuotas.dec(SocketServer.scala:1535)
at
kafka.network.Processor.$anonfun$processDisconnected$1(SocketServer.scala:1225)
at java.base/java.util.HashMap$KeySet.forEach(HashMap.java:1008)
at kafka.network.Processor.processDisconnected(SocketServer.scala:1216)
at kafka.network.Processor.run(SocketServer.scala:1019)
at java.base/java.lang.Thread.run(Thread.java:840)
[2024-07-28 19:46:41,755] DEBUG Leon: request obtained by requestQueue is null
(kafka.request.logger:476)
[2024-07-28 19:46:41,755] DEBUG Leon: before finally::proxyServer.close
(kafka:703) {code}
Log for test `closingChannelSendFailure` on JDK 11
({color:#00875a}Success{color})
{code:java}
[2024-07-28 19:24:48,265] DEBUG Leon: before receiveRequest (kafka:700)
[2024-07-28 19:24:48,265] DEBUG Leon: request obtained by callbackQueue is null
(kafka.request.logger:471)
[2024-07-28 19:24:48,265] DEBUG Completed
request:{"isForwarded":false,"requestHeader":{"requestApiKey":0,"requestApiVersion":11,"correlationId":-1,"clientId":"","requestApiKeyName":"PRODUCE"},"request":{"transactionalId":null,"acks":0,"timeoutMs":10000,"topicData":[]},"response":"","connection":"127.0.0.1:50229-127.0.0.1:50231-0","totalTimeMs":104.595,"requestQueueTimeMs":0.0,"localTimeMs":3.2080946725E7,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.316,"sendTimeMs":0.03,"securityProtocol":"SSL","principal":"User:ANONYMOUS","listener":"SSL","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}}
(kafka.request.logger:279)
[2024-07-28 19:24:48,265] TRACE Socket server received empty response to send,
registering for read: Response(type=NoOp, request=Request(processor=0,
connectionId=127.0.0.1:50229-127.0.0.1:50231-0,
session=org.apache.kafka.network.Session@242ff747,
listenerName=ListenerName(SSL), securityProtocol=SSL,
buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None))
(kafka.network.SocketServerTest$TestableProcessor:54)
[2024-07-28 19:24:48,266] TRACE Processor 0 received request:
RequestHeader(apiKey=PRODUCE, apiVersion=11, clientId=, correlationId=-1,
headerVersion=2) -- {acks=0,timeout=10000,partitionSizes=[]}
(kafka.network.RequestChannel$:45)
[2024-07-28 19:24:48,266] DEBUG Leon: request obtained by requestQueue is
Request(processor=0, connectionId=127.0.0.1:50229-127.0.0.1:50231-0,
session=org.apache.kafka.network.Session@5329f6b3,
listenerName=ListenerName(SSL), securityProtocol=SSL,
buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None)
(kafka.request.logger:476)
[2024-07-28 19:24:48,267] DEBUG Leon: before finally::proxyServer.close
(kafka:703)
[2024-07-28 19:24:48,267] DEBUG Leon: before sslConnect (kafka:1469)
[2024-07-28 19:24:48,268] DEBUG Leon: before sendRequest (kafka:1471) {code}
This test will call `receiveRequest` in _RequestChannel.scala_ to poll for
`callbackQueue` and `requestQueue`.
I found a daemon in _SocketServer.scala_ that will monitor and count the valid
connections which is maintained by `connectionQuotas`. It seems that the
connections in JDK 17 are disconnected unexpectedly. *Please find the 2 lines
marked in red.*
*Could you help to verify if you can find the same behavior on your environment
as well?* Here's the code snippet that I use for printing out the
connectionQuotas.
{code:java}
private def processDisconnected(): Unit = {
selector.disconnected.keySet.forEach { connectionId =>
try {
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format:
$connectionId")
}.remoteHost
inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
// the channel has been closed by the selector but the quotas still need
to be updated
val inetAddress = InetAddress.getByName(remoteHost)
debug(s"Leon: the connectionQuotas of inetAddress is
${connectionQuotas.get(inetAddress)}")
connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))
} catch {
case e: Throwable => processException(s"Exception while processing
disconnection of $connectionId", e)
}
}
} {code}
It's rarely the case that `connectionQuota` goes down so quickly. I can only
find 2 cases under JDK 17, but `closingChannelSendFailure` is the only one that
goes down to 0.
In the current stage, I cannot come up a valid root cause of this issue. Any
ideas?
> Some SocketServerTest buffered close tests flaky failing locally
> ----------------------------------------------------------------
>
> Key: KAFKA-16701
> URL: https://issues.apache.org/jira/browse/KAFKA-16701
> Project: Kafka
> Issue Type: Test
> Components: core, unit tests
> Affects Versions: 3.5.0, 3.6.0, 3.7.0
> Reporter: Greg Harris
> Assignee: bboyleonp
> Priority: Major
> Labels: flaky-test
>
> These tests are failing for me on a local development environment, but don't
> appear to be flaky or failing in CI. They only appear to fail for JDK >= 17.
> I'm using an M1 Mac, so it is possible that either the Mac's linear port
> allocation, or a native implementation is impacting this.
> closingChannelSendFailure()
>
> {noformat}
> java.lang.AssertionError: receiveRequest timed out
> at
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
> at
> kafka.network.SocketServerTest.makeChannelWithBufferedRequestsAndCloseRemote(SocketServerTest.scala:690)
> at
> kafka.network.SocketServerTest.$anonfun$verifySendFailureAfterRemoteClose$1(SocketServerTest.scala:1434)
> at
> kafka.network.SocketServerTest.verifySendFailureAfterRemoteClose(SocketServerTest.scala:1430)
> at
> kafka.network.SocketServerTest.closingChannelSendFailure(SocketServerTest.scala:1425){noformat}
> closingChannelWithBufferedReceivesFailedSend()
>
> {noformat}
> java.lang.AssertionError: receiveRequest timed out
> at
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
> at
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
> at
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
> at
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
> at
> kafka.network.SocketServerTest.closingChannelWithBufferedReceivesFailedSend(SocketServerTest.scala:1520){noformat}
> closingChannelWithCompleteAndIncompleteBufferedReceives()
> {noformat}
> java.lang.AssertionError: receiveRequest timed out
> at
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
> at
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
> at
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
> at
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
> at
> kafka.network.SocketServerTest.closingChannelWithCompleteAndIncompleteBufferedReceives(SocketServerTest.scala:1511)
> {noformat}
> remoteCloseWithBufferedReceives()
> {noformat}
> java.lang.AssertionError: receiveRequest timed out
> at
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
> at
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
> at
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
> at
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
> at
> kafka.network.SocketServerTest.remoteCloseWithBufferedReceives(SocketServerTest.scala:1453){noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)