kafka git commit: MINOR: Use correct connectionId in SocketServer log message

2017-07-17 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 335b9e48e -> 973f9d2b2


MINOR: Use correct connectionId in SocketServer log message

Also add connection id to KafkaChannel exception message

Author: Rajini Sivaram 

Reviewers: Jason Gustafson , Ismael Juma 

Closes #3529 from rajinisivaram/MINOR-log-connection-id

(cherry picked from commit cf94b188f1678276a4a6df6b87d89f2d4d2e922c)
Signed-off-by: Ismael Juma 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/973f9d2b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/973f9d2b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/973f9d2b

Branch: refs/heads/0.11.0
Commit: 973f9d2b2a308f81ba33fb2e9613d1c729e2b023
Parents: 335b9e4
Author: Rajini Sivaram 
Authored: Mon Jul 17 09:40:55 2017 +0100
Committer: Ismael Juma 
Committed: Mon Jul 17 09:51:14 2017 +0100

--
 .../java/org/apache/kafka/common/network/KafkaChannel.java  | 2 +-
 core/src/main/scala/kafka/network/SocketServer.scala| 9 +
 2 files changed, 6 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/973f9d2b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 5e3a895..b563c4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -152,7 +152,7 @@ public class KafkaChannel {
 
 public void setSend(Send send) {
 if (this.send != null)
-throw new IllegalStateException("Attempt to begin a send operation 
with prior send operation still in progress.");
+throw new IllegalStateException("Attempt to begin a send operation 
with prior send operation still in progress, connection id is " + id);
 this.send = send;
 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/973f9d2b/core/src/main/scala/kafka/network/SocketServer.scala
--
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 414557e..6db70cf 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -482,16 +482,17 @@ private[kafka] class Processor(val id: Int,
 
   /* `protected` for test usage */
   protected[network] def sendResponse(response: RequestChannel.Response, 
responseSend: Send) {
-trace(s"Socket server received response to send, registering for write and 
sending data: $response")
-val channel = selector.channel(responseSend.destination)
+val connectionId = response.request.connectionId
+trace(s"Socket server received response to send to $connectionId, 
registering for write and sending data: $response")
+val channel = selector.channel(connectionId)
 // `channel` can be null if the selector closed the connection because it 
was idle for too long
 if (channel == null) {
-  warn(s"Attempting to send response via channel for which there is no 
open connection, connection id $id")
+  warn(s"Attempting to send response via channel for which there is no 
open connection, connection id $connectionId")
   response.request.updateRequestMetrics(0L)
 }
 else {
   selector.send(responseSend)
-  inflightResponses += (response.request.connectionId -> response)
+  inflightResponses += (connectionId -> response)
 }
   }
 



kafka git commit: MINOR: Use correct connectionId in SocketServer log message

2017-07-17 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk d0ce0a95d -> cf94b188f


MINOR: Use correct connectionId in SocketServer log message

Also add connection id to KafkaChannel exception message

Author: Rajini Sivaram 

Reviewers: Jason Gustafson , Ismael Juma 

Closes #3529 from rajinisivaram/MINOR-log-connection-id


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cf94b188
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cf94b188
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cf94b188

Branch: refs/heads/trunk
Commit: cf94b188f1678276a4a6df6b87d89f2d4d2e922c
Parents: d0ce0a9
Author: Rajini Sivaram 
Authored: Mon Jul 17 09:40:55 2017 +0100
Committer: Ismael Juma 
Committed: Mon Jul 17 09:41:29 2017 +0100

--
 .../java/org/apache/kafka/common/network/KafkaChannel.java  | 2 +-
 core/src/main/scala/kafka/network/SocketServer.scala| 9 +
 2 files changed, 6 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/cf94b188/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 5e3a895..b563c4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -152,7 +152,7 @@ public class KafkaChannel {
 
 public void setSend(Send send) {
 if (this.send != null)
-throw new IllegalStateException("Attempt to begin a send operation 
with prior send operation still in progress.");
+throw new IllegalStateException("Attempt to begin a send operation 
with prior send operation still in progress, connection id is " + id);
 this.send = send;
 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cf94b188/core/src/main/scala/kafka/network/SocketServer.scala
--
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 9088eb5..2ba5553 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -482,16 +482,17 @@ private[kafka] class Processor(val id: Int,
 
   /* `protected` for test usage */
   protected[network] def sendResponse(response: RequestChannel.Response, 
responseSend: Send) {
-trace(s"Socket server received response to send, registering for write and 
sending data: $response")
-val channel = selector.channel(responseSend.destination)
+val connectionId = response.request.connectionId
+trace(s"Socket server received response to send to $connectionId, 
registering for write and sending data: $response")
+val channel = selector.channel(connectionId)
 // `channel` can be null if the selector closed the connection because it 
was idle for too long
 if (channel == null) {
-  warn(s"Attempting to send response via channel for which there is no 
open connection, connection id $id")
+  warn(s"Attempting to send response via channel for which there is no 
open connection, connection id $connectionId")
   response.request.updateRequestMetrics(0L)
 }
 else {
   selector.send(responseSend)
-  inflightResponses += (response.request.connectionId -> response)
+  inflightResponses += (connectionId -> response)
 }
   }