kafka git commit: MINOR: Use correct connectionId in SocketServer log message
Repository: kafka Updated Branches: refs/heads/0.11.0 335b9e48e -> 973f9d2b2 MINOR: Use correct connectionId in SocketServer log message Also add connection id to KafkaChannel exception message Author: Rajini Sivaram Reviewers: Jason Gustafson , Ismael Juma Closes #3529 from rajinisivaram/MINOR-log-connection-id (cherry picked from commit cf94b188f1678276a4a6df6b87d89f2d4d2e922c) Signed-off-by: Ismael Juma Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/973f9d2b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/973f9d2b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/973f9d2b Branch: refs/heads/0.11.0 Commit: 973f9d2b2a308f81ba33fb2e9613d1c729e2b023 Parents: 335b9e4 Author: Rajini Sivaram Authored: Mon Jul 17 09:40:55 2017 +0100 Committer: Ismael Juma Committed: Mon Jul 17 09:51:14 2017 +0100 -- .../java/org/apache/kafka/common/network/KafkaChannel.java | 2 +- core/src/main/scala/kafka/network/SocketServer.scala| 9 + 2 files changed, 6 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/973f9d2b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 5e3a895..b563c4a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -152,7 +152,7 @@ public class KafkaChannel { public void setSend(Send send) { if (this.send != null) -throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); +throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); } http://git-wip-us.apache.org/repos/asf/kafka/blob/973f9d2b/core/src/main/scala/kafka/network/SocketServer.scala -- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 414557e..6db70cf 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -482,16 +482,17 @@ private[kafka] class Processor(val id: Int, /* `protected` for test usage */ protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) { -trace(s"Socket server received response to send, registering for write and sending data: $response") -val channel = selector.channel(responseSend.destination) +val connectionId = response.request.connectionId +trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response") +val channel = selector.channel(connectionId) // `channel` can be null if the selector closed the connection because it was idle for too long if (channel == null) { - warn(s"Attempting to send response via channel for which there is no open connection, connection id $id") + warn(s"Attempting to send response via channel for which there is no open connection, connection id $connectionId") response.request.updateRequestMetrics(0L) } else { selector.send(responseSend) - inflightResponses += (response.request.connectionId -> response) + inflightResponses += (connectionId -> response) } }
kafka git commit: MINOR: Use correct connectionId in SocketServer log message
Repository: kafka Updated Branches: refs/heads/trunk d0ce0a95d -> cf94b188f MINOR: Use correct connectionId in SocketServer log message Also add connection id to KafkaChannel exception message Author: Rajini Sivaram Reviewers: Jason Gustafson , Ismael Juma Closes #3529 from rajinisivaram/MINOR-log-connection-id Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cf94b188 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cf94b188 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cf94b188 Branch: refs/heads/trunk Commit: cf94b188f1678276a4a6df6b87d89f2d4d2e922c Parents: d0ce0a9 Author: Rajini Sivaram Authored: Mon Jul 17 09:40:55 2017 +0100 Committer: Ismael Juma Committed: Mon Jul 17 09:41:29 2017 +0100 -- .../java/org/apache/kafka/common/network/KafkaChannel.java | 2 +- core/src/main/scala/kafka/network/SocketServer.scala| 9 + 2 files changed, 6 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/cf94b188/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 5e3a895..b563c4a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -152,7 +152,7 @@ public class KafkaChannel { public void setSend(Send send) { if (this.send != null) -throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); +throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); } http://git-wip-us.apache.org/repos/asf/kafka/blob/cf94b188/core/src/main/scala/kafka/network/SocketServer.scala -- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 9088eb5..2ba5553 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -482,16 +482,17 @@ private[kafka] class Processor(val id: Int, /* `protected` for test usage */ protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) { -trace(s"Socket server received response to send, registering for write and sending data: $response") -val channel = selector.channel(responseSend.destination) +val connectionId = response.request.connectionId +trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response") +val channel = selector.channel(connectionId) // `channel` can be null if the selector closed the connection because it was idle for too long if (channel == null) { - warn(s"Attempting to send response via channel for which there is no open connection, connection id $id") + warn(s"Attempting to send response via channel for which there is no open connection, connection id $connectionId") response.request.updateRequestMetrics(0L) } else { selector.send(responseSend) - inflightResponses += (response.request.connectionId -> response) + inflightResponses += (connectionId -> response) } }