Repository: kafka
Updated Branches:
  refs/heads/trunk d0ce0a95d -> cf94b188f


MINOR: Use correct connectionId in SocketServer log message

Also add connection id to KafkaChannel exception message

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <ism...@juma.me.uk>

Closes #3529 from rajinisivaram/MINOR-log-connection-id


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cf94b188
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cf94b188
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cf94b188

Branch: refs/heads/trunk
Commit: cf94b188f1678276a4a6df6b87d89f2d4d2e922c
Parents: d0ce0a9
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Mon Jul 17 09:40:55 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Mon Jul 17 09:41:29 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/kafka/common/network/KafkaChannel.java  | 2 +-
 core/src/main/scala/kafka/network/SocketServer.scala        | 9 +++++----
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cf94b188/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 5e3a895..b563c4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -152,7 +152,7 @@ public class KafkaChannel {
 
     public void setSend(Send send) {
         if (this.send != null)
-            throw new IllegalStateException("Attempt to begin a send operation 
with prior send operation still in progress.");
+            throw new IllegalStateException("Attempt to begin a send operation 
with prior send operation still in progress, connection id is " + id);
         this.send = send;
         this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cf94b188/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 9088eb5..2ba5553 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -482,16 +482,17 @@ private[kafka] class Processor(val id: Int,
 
   /* `protected` for test usage */
   protected[network] def sendResponse(response: RequestChannel.Response, 
responseSend: Send) {
-    trace(s"Socket server received response to send, registering for write and 
sending data: $response")
-    val channel = selector.channel(responseSend.destination)
+    val connectionId = response.request.connectionId
+    trace(s"Socket server received response to send to $connectionId, 
registering for write and sending data: $response")
+    val channel = selector.channel(connectionId)
     // `channel` can be null if the selector closed the connection because it 
was idle for too long
     if (channel == null) {
-      warn(s"Attempting to send response via channel for which there is no 
open connection, connection id $id")
+      warn(s"Attempting to send response via channel for which there is no 
open connection, connection id $connectionId")
       response.request.updateRequestMetrics(0L)
     }
     else {
       selector.send(responseSend)
-      inflightResponses += (response.request.connectionId -> response)
+      inflightResponses += (connectionId -> response)
     }
   }
 

Reply via email to