Re: Review Request 33065: Patch for KAFKA-1928

2015-06-03 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated June 3, 2015, 10:59 p.m.)


Review request for kafka.


Bugs: 1928 and KAFKA-1928
https://issues.apache.org/jira/browse/1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description (updated)
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


removed reify and remaining from send/recieve API, per Jun. moved 
maybeCloseOldest() to Selector per Jay


added idString to node API, changed written to int in Send API


cleaning up MultiSend, added size() to Send interface


fixed some issues with multisend


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


fixed metric thingies


fixed response order bug


error handling for illegal selector state and fix metrics bug


optimized selection key lookup with identity hash


fix accidental change


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


addressing Jun's comments


removed connection-aging for clients


fix issues with exception handling and other cleanup


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Revert removed connection-aging for clients

This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.

improving exception handling and other minor fixes


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, 
changed Send size to long, and fixed an existing issue where Reporters are not 
actually loaded


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


move to single metrics object


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


improvements on new configs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
435fbb5116e80302eba11ed1d3069cb577dbdcbd 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
bdff518b732105823058e6182f445248b45dc388 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 fac79951d50ef6f19cef5fe62cbc4582b27b145a 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
c5e577ff98bea3de65e290d30065935a29b3247f 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
ded19d85c4605c0ecac0ddca3dc7779d77589ccc 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
023bd2eb6a772d80fe54cbef0182b1b0ad5ef2b3 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
1e943d621732889a1c005b243920dc32cea7af66 
  clients/src/main/java/org/apache/kafka/common/Node.java 
f4e4186c7602787e58e304a2f1c293a633114656 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  

Re: Review Request 33065: Patch for KAFKA-1928

2015-06-03 Thread Gwen Shapira


 On June 1, 2015, 11:52 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/KafkaConfig.scala, line 498
  https://reviews.apache.org/r/33065/diff/17/?file=975403#file975403line498
 
  It's better to define it of type LIST, in the same way as the client. 
  Then, we can use config.getConfiguredInstances to get the reporter 
  instances, which calls configure() for you.

Unfortunately, KafkaConfig doesn't extend AbstractConfig, so it doesn't inherit 
getConfiguredInstances. Thanks for pointing this out, I'll need to call 
configure() myself.

Also, because KafkaConfig does not retain a copy of the original Properties (as 
AbstractConfig does), we will not be able to pass arbitraty properties to 
reporters when we call configure() (which I think was the intent). I am leaving 
this as-is for now, but I think we'll want to sort out KafkaConfig in a 
follow-up.


 On June 1, 2015, 11:52 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 321
  https://reviews.apache.org/r/33065/diff/17/?file=975399#file975399line321
 
  Is this change needed? Isn't it simpler to just use port?

Not strictly needed, but I found it helpful while working on the code.
All our unit tests have port=0. Being able to see the actual port makes 
troubleshooting a bit easier. Since its an INFO level message, I thought its ok 
to do some extra work.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review86095
---


On May 31, 2015, 9:49 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 31, 2015, 9:49 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Revert removed connection-aging for clients
 
 This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.
 
 improving exception handling and other minor fixes
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, 
 changed Send size to long, and fixed an existing issue where Reporters are 
 not actually loaded
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 move to single metrics object
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-06-03 Thread Gwen Shapira


 On June 1, 2015, 11:15 p.m., Jay Kreps wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, line 
  332
  https://reviews.apache.org/r/33065/diff/17/?file=975364#file975364line332
 
  This is less of a connection id and more of a formatted connection, 
  right? Something like formatConnection?

Removed this. It was a leftover from logic that got refactored out.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review86117
---


On May 31, 2015, 9:49 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 31, 2015, 9:49 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Revert removed connection-aging for clients
 
 This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.
 
 improving exception handling and other minor fixes
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, 
 changed Send size to long, and fixed an existing issue where Reporters are 
 not actually loaded
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 move to single metrics object
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  b2764df11afa7a99fce46d1ff48960d889032d14 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 ded19d85c4605c0ecac0ddca3dc7779d77589ccc 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 023bd2eb6a772d80fe54cbef0182b1b0ad5ef2b3 
   
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-06-01 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review86117
---



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment138037

This is less of a connection id and more of a formatted connection, 
right? Something like formatConnection?


- Jay Kreps


On May 31, 2015, 9:49 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 31, 2015, 9:49 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Revert removed connection-aging for clients
 
 This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.
 
 improving exception handling and other minor fixes
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, 
 changed Send size to long, and fixed an existing issue where Reporters are 
 not actually loaded
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 move to single metrics object
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  b2764df11afa7a99fce46d1ff48960d889032d14 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 ded19d85c4605c0ecac0ddca3dc7779d77589ccc 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 023bd2eb6a772d80fe54cbef0182b1b0ad5ef2b3 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 1e943d621732889a1c005b243920dc32cea7af66 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-06-01 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review86118
---

Ship it!


Looks good to me.

- Jay Kreps


On May 31, 2015, 9:49 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 31, 2015, 9:49 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Revert removed connection-aging for clients
 
 This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.
 
 improving exception handling and other minor fixes
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, 
 changed Send size to long, and fixed an existing issue where Reporters are 
 not actually loaded
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 move to single metrics object
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  b2764df11afa7a99fce46d1ff48960d889032d14 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 ded19d85c4605c0ecac0ddca3dc7779d77589ccc 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 023bd2eb6a772d80fe54cbef0182b1b0ad5ef2b3 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 1e943d621732889a1c005b243920dc32cea7af66 
   clients/src/main/java/org/apache/kafka/common/Node.java 
 f4e4186c7602787e58e304a2f1c293a633114656 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-06-01 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review86095
---


Thanks for the latest patch. A few more minor comments below.


clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment138045

We probably should include the metricsTag in the per-node sensor name too. 
It may be worth maintaining a connectionId-sensorName map to avoid the 
overhead of reconstructing the sensorName everytime during recording.



core/src/main/scala/kafka/network/RequestChannel.scala
https://reviews.apache.org/r/33065/#comment138027

This defines a method with void return. We need to define it to return a 
string.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment137997

Our current convention is to not wrap single line statement with {}.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment138008

Is this change needed? Isn't it simpler to just use port?



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/33065/#comment138042

Could we just use consumerId instead of joinGroupRequest.consumerId?



core/src/main/scala/kafka/server/KafkaConfig.scala
https://reviews.apache.org/r/33065/#comment138035

It's better to define it of type LIST, in the same way as the client. Then, 
we can use config.getConfiguredInstances to get the reporter instances, which 
calls configure() for you.



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/33065/#comment138032

Incorrect prefix.



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/33065/#comment138033

config.metricReporterClasses is only used here. I am wondering if it's 
better to define it as a java List to avoid the back-and-forth conversion btw 
scala and java.


- Jun Rao


On May 31, 2015, 9:49 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 31, 2015, 9:49 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Revert removed connection-aging for clients
 
 This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.
 
 improving exception handling and other minor fixes
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, 
 changed Send size to long, and fixed an existing issue where Reporters are 
 not actually loaded
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 move to single metrics object
 
 
 Merge branch 'trunk' of 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-31 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 31, 2015, 9:49 p.m.)


Review request for kafka.


Bugs: 1928 and KAFKA-1928
https://issues.apache.org/jira/browse/1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description (updated)
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


removed reify and remaining from send/recieve API, per Jun. moved 
maybeCloseOldest() to Selector per Jay


added idString to node API, changed written to int in Send API


cleaning up MultiSend, added size() to Send interface


fixed some issues with multisend


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


fixed metric thingies


fixed response order bug


error handling for illegal selector state and fix metrics bug


optimized selection key lookup with identity hash


fix accidental change


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


addressing Jun's comments


removed connection-aging for clients


fix issues with exception handling and other cleanup


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Revert removed connection-aging for clients

This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.

improving exception handling and other minor fixes


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, 
changed Send size to long, and fixed an existing issue where Reporters are not 
actually loaded


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


move to single metrics object


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
435fbb5116e80302eba11ed1d3069cb577dbdcbd 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
bdff518b732105823058e6182f445248b45dc388 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 b2764df11afa7a99fce46d1ff48960d889032d14 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
ded19d85c4605c0ecac0ddca3dc7779d77589ccc 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
023bd2eb6a772d80fe54cbef0182b1b0ad5ef2b3 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
1e943d621732889a1c005b243920dc32cea7af66 
  clients/src/main/java/org/apache/kafka/common/Node.java 
f4e4186c7602787e58e304a2f1c293a633114656 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd3a734fd429966d3e2016a2488dbbb19e5 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-27 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review85360
---

Ship it!


Thanks for the big patch Gwen.


core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment136929

val



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment136907

Can we pre-create the MetricName objects outside - to avoid creating these 
short-lived MetricName/HashMap objects for each tick interval of the gauge. Not 
sure if this will end up looking worse though since you have to punch in the 
processor id as well.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment136906

`sum / numProcessorThreads`

(also you can do `collection.map(...).sum / numProcessorThreads`)



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment136952

Can we have just one metrics object for the entire socket server?



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment136928

val


- Joel Koshy


On May 24, 2015, 4:53 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 24, 2015, 4:53 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Revert removed connection-aging for clients
 
 This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.
 
 improving exception handling and other minor fixes
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, 
 changed Send size to long, and fixed an existing issue where Reporters are 
 not actually loaded
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-27 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review85412
---



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment136973

Agreed with Joel's point. We should probably just create a single Metrics 
object in KafkaServer and pass it into SocketServer. Also, we should create it 
in a way similar to that in KafkaProducer. We likely will need to add some new 
metrics related properties in KafkaConfig.


- Jun Rao


On May 24, 2015, 4:53 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 24, 2015, 4:53 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Revert removed connection-aging for clients
 
 This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.
 
 improving exception handling and other minor fixes
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, 
 changed Send size to long, and fixed an existing issue where Reporters are 
 not actually loaded
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  b2764df11afa7a99fce46d1ff48960d889032d14 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 8e336a3aa96c73f52beaeb56b931baf4b026cf21 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 187d0004c8c46b6664ddaffecc6166d4b47351e5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 1e943d621732889a1c005b243920dc32cea7af66 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-27 Thread Gwen Shapira


 On May 27, 2015, 6:44 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 109
  https://reviews.apache.org/r/33065/diff/16/?file=971264#file971264line109
 
  Agreed with Joel's point. We should probably just create a single 
  Metrics object in KafkaServer and pass it into SocketServer. Also, we 
  should create it in a way similar to that in KafkaProducer. We likely will 
  need to add some new metrics related properties in KafkaConfig.

I ran into an issue when I tried using a single Metrics object:

I initiated a single Metrics object and passed it into all the Selectors. 
Each selector had its own processor-id tags. When the Selectors initialize, 
each Sensor is re-used by all Selectors. The different MetricsNames with the 
different processor-id tags are all added with to the same Sensor. When we call 
Sensor.record - all statistics are updated, for all Selectors. Which means that 
all Selectors have the same statistics, which cannot be correct.

I solved this with multiple Metrics objects. Did I miss a better solution?


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review85412
---


On May 24, 2015, 4:53 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 24, 2015, 4:53 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Revert removed connection-aging for clients
 
 This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.
 
 improving exception handling and other minor fixes
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, 
 changed Send size to long, and fixed an existing issue where Reporters are 
 not actually loaded
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-27 Thread Jun Rao


 On May 27, 2015, 6:44 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 109
  https://reviews.apache.org/r/33065/diff/16/?file=971264#file971264line109
 
  Agreed with Joel's point. We should probably just create a single 
  Metrics object in KafkaServer and pass it into SocketServer. Also, we 
  should create it in a way similar to that in KafkaProducer. We likely will 
  need to add some new metrics related properties in KafkaConfig.
 
 Gwen Shapira wrote:
 I ran into an issue when I tried using a single Metrics object:
 
 I initiated a single Metrics object and passed it into all the Selectors. 
 Each selector had its own processor-id tags. When the Selectors 
 initialize, each Sensor is re-used by all Selectors. The different 
 MetricsNames with the different processor-id tags are all added with to the 
 same Sensor. When we call Sensor.record - all statistics are updated, for all 
 Selectors. Which means that all Selectors have the same statistics, which 
 cannot be correct.
 
 I solved this with multiple Metrics objects. Did I miss a better solution?

This seems to be an issue with the Seletor. Basically, when creating a sensor 
in a selector, we didn't include the metricTags in the sensor name. Therefore, 
the sensors in the selector will have the same name if the metric instance is 
the same. We can fix this by including the tags in the sensor name. The sensor 
name just have to be unique and is not tied to the external jmx name. So, we 
can probably just add strings like tagKey-tagValue to the sensor name. This 
will make the construction of the sensor name more expensive. We can probably 
create a separate jira to optimize that a bit (e.g. by caching the global and 
the per node sensor names).


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review85412
---


On May 24, 2015, 4:53 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 24, 2015, 4:53 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Revert removed connection-aging for clients
 
 This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.
 
 improving exception handling and other minor fixes
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, 
 changed Send size to long, and fixed an existing issue where Reporters are 
 not actually loaded
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-26 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review85180
---

Ship it!


Thanks for the latest patch. +1 other than the following minor comments.


clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment136709

This is actually not an Illegal state exception.



core/src/main/scala/kafka/network/RequestChannel.scala
https://reviews.apache.org/r/33065/#comment136703

This forces the request string to be generated independant of the logging 
mode. Perhaps we can define this as a function?



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment136699

No need to assign this to a val.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment136693

It seems that we standardize on the dash notation in the new metrics. So, 
instead of SocketServer, it's better to use socket-server.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment136692

Perhaps we can just log the exception.


- Jun Rao


On May 24, 2015, 4:53 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 24, 2015, 4:53 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Revert removed connection-aging for clients
 
 This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.
 
 improving exception handling and other minor fixes
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, 
 changed Send size to long, and fixed an existing issue where Reporters are 
 not actually loaded
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
 cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-24 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 24, 2015, 4:53 p.m.)


Review request for kafka.


Bugs: 1928 and KAFKA-1928
https://issues.apache.org/jira/browse/1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description (updated)
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


removed reify and remaining from send/recieve API, per Jun. moved 
maybeCloseOldest() to Selector per Jay


added idString to node API, changed written to int in Send API


cleaning up MultiSend, added size() to Send interface


fixed some issues with multisend


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


fixed metric thingies


fixed response order bug


error handling for illegal selector state and fix metrics bug


optimized selection key lookup with identity hash


fix accidental change


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


addressing Jun's comments


removed connection-aging for clients


fix issues with exception handling and other cleanup


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Revert removed connection-aging for clients

This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.

improving exception handling and other minor fixes


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, 
changed Send size to long, and fixed an existing issue where Reporters are not 
actually loaded


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
435fbb5116e80302eba11ed1d3069cb577dbdcbd 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
bdff518b732105823058e6182f445248b45dc388 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 b2764df11afa7a99fce46d1ff48960d889032d14 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
8e336a3aa96c73f52beaeb56b931baf4b026cf21 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
187d0004c8c46b6664ddaffecc6166d4b47351e5 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
1e943d621732889a1c005b243920dc32cea7af66 
  clients/src/main/java/org/apache/kafka/common/Node.java 
f4e4186c7602787e58e304a2f1c293a633114656 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd3a734fd429966d3e2016a2488dbbb19e5 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 
4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-21 Thread Gwen Shapira


 On May 21, 2015, 2:11 a.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/network/RequestChannel.scala, line 110
  https://reviews.apache.org/r/33065/diff/15/?file=965056#file965056line110
 
  This would result in the following log entry:
  
  TRACE Completed 
  request:{api_key=12,api_version=0,correlation_id=40,client_id=consumer-1} : 
  {group_id=my-test,group_generation_id=1,consumer_id=e6c857b3-e362-4392-a272-94aa3e42d1fb}
   ...
  
  which is not very intuitive. Could we add a toString function to the 
  requests as we did in the old classes?

This is a bit out of scope for this patch. Can you open a new JIRA? I'll be 
happy to add this.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review84649
---


On May 20, 2015, 10:42 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 20, 2015, 10:42 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Revert removed connection-aging for clients
 
 This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.
 
 improving exception handling and other minor fixes
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  b2764df11afa7a99fce46d1ff48960d889032d14 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 8e336a3aa96c73f52beaeb56b931baf4b026cf21 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 187d0004c8c46b6664ddaffecc6166d4b47351e5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 1e943d621732889a1c005b243920dc32cea7af66 
   clients/src/main/java/org/apache/kafka/common/Node.java 
 f4e4186c7602787e58e304a2f1c293a633114656 
   
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-20 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 20, 2015, 10:42 a.m.)


Review request for kafka.


Bugs: 1928 and KAFKA-1928
https://issues.apache.org/jira/browse/1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description (updated)
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


removed reify and remaining from send/recieve API, per Jun. moved 
maybeCloseOldest() to Selector per Jay


added idString to node API, changed written to int in Send API


cleaning up MultiSend, added size() to Send interface


fixed some issues with multisend


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


fixed metric thingies


fixed response order bug


error handling for illegal selector state and fix metrics bug


optimized selection key lookup with identity hash


fix accidental change


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


addressing Jun's comments


removed connection-aging for clients


fix issues with exception handling and other cleanup


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Revert removed connection-aging for clients

This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.

improving exception handling and other minor fixes


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
435fbb5116e80302eba11ed1d3069cb577dbdcbd 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
bdff518b732105823058e6182f445248b45dc388 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 b2764df11afa7a99fce46d1ff48960d889032d14 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
8e336a3aa96c73f52beaeb56b931baf4b026cf21 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
187d0004c8c46b6664ddaffecc6166d4b47351e5 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
1e943d621732889a1c005b243920dc32cea7af66 
  clients/src/main/java/org/apache/kafka/common/Node.java 
f4e4186c7602787e58e304a2f1c293a633114656 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd3a734fd429966d3e2016a2488dbbb19e5 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 
4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 
5d321a09e470166a1c33639cf0cab26a3bce98ec 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf390c7f148ffa8c5abc154c72cbf0829715c 
  

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-20 Thread Gwen Shapira


 On May 19, 2015, 11:34 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 431
  https://reviews.apache.org/r/33065/diff/14/?file=963545#file963545line431
 
  We probably just want to catch Exception. For unexpected error, we will 
  just let it propagate and kill the processor thread.

I hope you meant that catching Throwable is too much and that we need to handle 
more specific exceptions :)


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review84398
---


On May 19, 2015, 8:26 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 19, 2015, 8:26 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  b2764df11afa7a99fce46d1ff48960d889032d14 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 8e336a3aa96c73f52beaeb56b931baf4b026cf21 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 1e943d621732889a1c005b243920dc32cea7af66 
   clients/src/main/java/org/apache/kafka/common/Node.java 
 f4e4186c7602787e58e304a2f1c293a633114656 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-20 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review84536
---


Thanks for the latest patch. Just two new minor comments and a couple of 
previous comments that haven't been addressed.


clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
https://reviews.apache.org/r/33065/#comment135849

We probably should make this (and the one in ProducerConfig) a bit smaller 
than the broker side default so that the client and server don't try to close 
the same idle socket at the same time. Perhaps sth like 9 mins?



clients/src/main/java/org/apache/kafka/common/network/Send.java
https://reviews.apache.org/r/33065/#comment135852

The issue that I am seeing is the following. When sending data to a socket, 
we use NetworkSend, which prepends a size to the payload buffer. So, in this 
case, size() for NetworkSend is actually 4 + the payload size.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment135850

I was thinking of exposing IO-WAIT-RATIO as a gauge to 
NetworkProcessorAvgIdlePercent. This way, people who rely on Coda Hale metrics 
reporter can still get this important metric as before. In any case, 
IO-WAIT-RATIO is per processor and we want another metric at the aggregate 
level.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment135876

We probably can use the e @ pattern matching to handle both types of 
exceptions.


- Jun Rao


On May 20, 2015, 10:42 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 20, 2015, 10:42 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Revert removed connection-aging for clients
 
 This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.
 
 improving exception handling and other minor fixes
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-20 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review84649
---



core/src/main/scala/kafka/network/RequestChannel.scala
https://reviews.apache.org/r/33065/#comment135995

Found this problem while working on KAFKA-2208: we need to add the if / 
else for old / new request formats here as well.



core/src/main/scala/kafka/network/RequestChannel.scala
https://reviews.apache.org/r/33065/#comment135997

This would result in the following log entry:

TRACE Completed 
request:{api_key=12,api_version=0,correlation_id=40,client_id=consumer-1} : 
{group_id=my-test,group_generation_id=1,consumer_id=e6c857b3-e362-4392-a272-94aa3e42d1fb}
 ...

which is not very intuitive. Could we add a toString function to the 
requests as we did in the old classes?


- Guozhang Wang


On May 20, 2015, 10:42 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 20, 2015, 10:42 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Revert removed connection-aging for clients
 
 This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1.
 
 improving exception handling and other minor fixes
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  b2764df11afa7a99fce46d1ff48960d889032d14 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 8e336a3aa96c73f52beaeb56b931baf4b026cf21 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 187d0004c8c46b6664ddaffecc6166d4b47351e5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 1e943d621732889a1c005b243920dc32cea7af66 
   clients/src/main/java/org/apache/kafka/common/Node.java 
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-19 Thread Gwen Shapira


 On May 18, 2015, 4:53 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, line 
  115
  https://reviews.apache.org/r/33065/diff/12/?file=962798#file962798line115
 
  Do we need to turn this into a map? NetworkReceive already has the 
  connection id.

NetworkReceive has connection ID only if we are sure we got a valid 
NetworkReceive. 
On the other hand, the transmission key can always be used to get a good 
connection ID.

I turned this into a map, so when we are handling exceptions while processing 
completed receives, we'll be able to log and close the right connection without 
assuming that receive.source is valid.

However, on second look, I think any issues with creating the NetworkRecieve 
objects will throw earlier (during poll()) and while processing completed 
receives we can safely use receive.source.

I'll revert this and add a comment to avoid confusing myself again.


 On May 18, 2015, 4:53 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, lines 
  330-338
  https://reviews.apache.org/r/33065/diff/12/?file=962798#file962798line330
 
  It doesn't seem that we need this method. Both callers can use 
  transmission.id instead.

Unfortunately, not when we are throwing invalid receive exception. The 
connection details are super important for troubleshooting, so I'm going the 
extra mile to get them when the receive is broken.


 On May 18, 2015, 4:53 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, lines 
  587-588
  https://reviews.apache.org/r/33065/diff/12/?file=962798#file962798line587
 
  Perhaps we should rename the method to maybeRegisterConnectionMetrics 
  and node to connectionId.

Note that word node appears in the metrics themselves (the group is 
node-metrics, we have node-... -bytes.sent etc)
It will be clearer to change those to connection, but it will break some 
existing monitoring out there.

Thoughts?


 On May 18, 2015, 4:53 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/Send.java, line 56
  https://reviews.apache.org/r/33065/diff/12/?file=962799#file962799line56
 
  This needs to be long since NetworkSend always includes a size buffer, 
  plus the payload buffer.

size variable does not include the size of the size buffer, just the payload 
size (it is actually populated from the size buffer, which is 4 bytes and 
guaranteed to be int)


 On May 18, 2015, 4:53 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, lines 64-67
  https://reviews.apache.org/r/33065/diff/12/?file=962831#file962831line64
 
  Yes, this is the same as IO-WAIT-RATIO. However, until we move all 
  metrics off Coda Hale metrics, we want to expose key metrics in Kafka 
  metrics to Coda Hale as well on the broker side for backward compatibility.

I admit that I don't have a plan on how to expose o.a.k.common.metrics as 
KafkaMetrics.
Do you think the best way to do this is to maintain both concurrently? This can 
lead to small differences and confusion / complaints from people trying to 
migrate.


 On May 18, 2015, 4:53 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java, 
  lines 27-28
  https://reviews.apache.org/r/33065/diff/13/?file=962881#file962881line27
 
  Both should probably be defined as long.

I don't think so. Both refer only to the payload size.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review84158
---


On May 18, 2015, 3:56 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 18, 2015, 3:56 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-19 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 19, 2015, 8:26 a.m.)


Review request for kafka.


Bugs: 1928 and KAFKA-1928
https://issues.apache.org/jira/browse/1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description (updated)
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


removed reify and remaining from send/recieve API, per Jun. moved 
maybeCloseOldest() to Selector per Jay


added idString to node API, changed written to int in Send API


cleaning up MultiSend, added size() to Send interface


fixed some issues with multisend


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


fixed metric thingies


fixed response order bug


error handling for illegal selector state and fix metrics bug


optimized selection key lookup with identity hash


fix accidental change


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


addressing Jun's comments


removed connection-aging for clients


fix issues with exception handling and other cleanup


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
435fbb5116e80302eba11ed1d3069cb577dbdcbd 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 b2764df11afa7a99fce46d1ff48960d889032d14 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
8e336a3aa96c73f52beaeb56b931baf4b026cf21 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
1e943d621732889a1c005b243920dc32cea7af66 
  clients/src/main/java/org/apache/kafka/common/Node.java 
f4e4186c7602787e58e304a2f1c293a633114656 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd3a734fd429966d3e2016a2488dbbb19e5 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 
4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 
5d321a09e470166a1c33639cf0cab26a3bce98ec 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf390c7f148ffa8c5abc154c72cbf0829715c 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
5e3fab13e3c02eb351558ec973b949b3d1196085 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
8b278892883e63899b53e15efb9d8c926131e858 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b026e788b4e5479f3419805aa49ae889f3 
  clients/src/test/java/org/apache/kafka/test/MockSelector.java 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-19 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review84398
---


Thanks for the latest patch. Only a few more minor comments.

Also, Jay mentioned the use cases for closing idle connections on the client 
side. So, perhaps we can add it back.


core/src/main/scala/kafka/api/FetchResponse.scala
https://reviews.apache.org/r/33065/#comment135643

val should be def.



core/src/main/scala/kafka/api/FetchResponse.scala
https://reviews.apache.org/r/33065/#comment135644

val should be def.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment135662

Should we do the same for IOException as well?



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment135653

We probably just want to catch Exception. For unexpected error, we will 
just let it propagate and kill the processor thread.


- Jun Rao


On May 19, 2015, 8:26 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 19, 2015, 8:26 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  b2764df11afa7a99fce46d1ff48960d889032d14 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 8e336a3aa96c73f52beaeb56b931baf4b026cf21 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 1e943d621732889a1c005b243920dc32cea7af66 
   clients/src/main/java/org/apache/kafka/common/Node.java 
 f4e4186c7602787e58e304a2f1c293a633114656 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-19 Thread Jun Rao


 On May 18, 2015, 4:53 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/Send.java, line 56
  https://reviews.apache.org/r/33065/diff/12/?file=962799#file962799line56
 
  This needs to be long since NetworkSend always includes a size buffer, 
  plus the payload buffer.
 
 Gwen Shapira wrote:
 size variable does not include the size of the size buffer, just the 
 payload size (it is actually populated from the size buffer, which is 4 bytes 
 and guaranteed to be int)

The issue that I am seeing is the following. When sending data to a socket, we 
use NetworkSend, which prepends a size to the payload buffer. So, in this case, 
size() for NetworkSend is actually 4 + the payload size.


 On May 18, 2015, 4:53 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, lines 64-67
  https://reviews.apache.org/r/33065/diff/12/?file=962831#file962831line64
 
  Yes, this is the same as IO-WAIT-RATIO. However, until we move all 
  metrics off Coda Hale metrics, we want to expose key metrics in Kafka 
  metrics to Coda Hale as well on the broker side for backward compatibility.
 
 Gwen Shapira wrote:
 I admit that I don't have a plan on how to expose o.a.k.common.metrics as 
 KafkaMetrics.
 Do you think the best way to do this is to maintain both concurrently? 
 This can lead to small differences and confusion / complaints from people 
 trying to migrate.

I was thinking of just exposing IO-WAIT-RATIO as a gauge to 
NetworkProcessorAvgIdlePercent. This way, people who rely on Coda Hale metrics 
reporter can still get this important metric as before.


 On May 18, 2015, 4:53 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, lines 
  587-588
  https://reviews.apache.org/r/33065/diff/12/?file=962798#file962798line587
 
  Perhaps we should rename the method to maybeRegisterConnectionMetrics 
  and node to connectionId.
 
 Gwen Shapira wrote:
 Note that word node appears in the metrics themselves (the group is 
 node-metrics, we have node-... -bytes.sent etc)
 It will be clearer to change those to connection, but it will break 
 some existing monitoring out there.
 
 Thoughts?

Yes, we can keep node in the metric name.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review84158
---


On May 19, 2015, 8:26 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 19, 2015, 8:26 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 fix issues with exception handling and other cleanup
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-18 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 18, 2015, 2:58 p.m.)


Review request for kafka.


Bugs: 1928 and KAFKA-1928
https://issues.apache.org/jira/browse/1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description (updated)
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


removed reify and remaining from send/recieve API, per Jun. moved 
maybeCloseOldest() to Selector per Jay


added idString to node API, changed written to int in Send API


cleaning up MultiSend, added size() to Send interface


fixed some issues with multisend


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


fixed metric thingies


fixed response order bug


error handling for illegal selector state and fix metrics bug


optimized selection key lookup with identity hash


fix accidental change


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


addressing Jun's comments


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
435fbb5116e80302eba11ed1d3069cb577dbdcbd 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
bdff518b732105823058e6182f445248b45dc388 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 b2764df11afa7a99fce46d1ff48960d889032d14 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
8e336a3aa96c73f52beaeb56b931baf4b026cf21 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
187d0004c8c46b6664ddaffecc6166d4b47351e5 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
1e943d621732889a1c005b243920dc32cea7af66 
  clients/src/main/java/org/apache/kafka/common/Node.java 
f4e4186c7602787e58e304a2f1c293a633114656 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd3a734fd429966d3e2016a2488dbbb19e5 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 
4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 
5d321a09e470166a1c33639cf0cab26a3bce98ec 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf390c7f148ffa8c5abc154c72cbf0829715c 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
5e3fab13e3c02eb351558ec973b949b3d1196085 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
8b278892883e63899b53e15efb9d8c926131e858 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b026e788b4e5479f3419805aa49ae889f3 
  

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-18 Thread Gwen Shapira


 On May 15, 2015, 10:12 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java, line 
  74
  https://reviews.apache.org/r/33065/diff/11/?file=961086#file961086line74
 
  Actually, with MultiSend, we will be sending a 4-byte size plus the 
  payload, the sum of which could be a bit larger than max_int. So, I think 
  we need to make writeTo and size return long instead in Send. Sorry for the 
  incorrect suggestion earlier.

My understanding is that size() describes just the payload, which is an int, 
writeTo() writes the payload+size which is a bit larger and is therefore a long.


 On May 15, 2015, 10:12 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, line 
  124
  https://reviews.apache.org/r/33065/diff/11/?file=961091#file961091line124
 
  We only need to close idle connections on the server side. So, we can 
  use MAX_LONG for connectionMaxIdleMs for the clients. But we need to see if 
  maybeCloseOldestConnection handles overflows well. Alternatively, we can 
  just not maintain the lruConnection on the client side.

If indeed we don't need to close idle connections on client size, I think just 
not maintaining LRUs (and exiting early from maybeCloseOldest) is the cleanest 
solution.


 On May 15, 2015, 10:12 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 437
  https://reviews.apache.org/r/33065/diff/11/?file=961124#file961124line437
 
  Do we need to close again? We may hit the same exception as before. 
  Also, see the comment above.

Good point.

In the trunk code, this is safe, since we have the selection key at this point 
and can safely just close this one connection. 
With Selector, we just have a list of Recieves, which may not have a valid dest 
and therefore we may be unable to recover the selection key.

However, without the selection key, we can't log the host/port of the client 
that sent invalid request, which will make debugging nearly impossible.

I think the right solution will be to turn completedRecieves into a Map that 
includes both the receive and the client and server host/port pairs from the 
selection key. This should let us close the right connection and log the right 
message, the same way SocketServer currently does.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review83959
---


On May 15, 2015, 7:30 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 15, 2015, 7:30 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-18 Thread Gwen Shapira


 On May 15, 2015, 10:12 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 354
  https://reviews.apache.org/r/33065/diff/11/?file=961124#file961124line354
 
  I guess we will make use of this in a followup patch?

Actually, I think its no longer needed and can be removed.

Wouldn't IO-WAIT-RATIO include the same information?


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review83959
---


On May 15, 2015, 7:30 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 15, 2015, 7:30 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 8e336a3aa96c73f52beaeb56b931baf4b026cf21 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 187d0004c8c46b6664ddaffecc6166d4b47351e5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 1e943d621732889a1c005b243920dc32cea7af66 
   clients/src/main/java/org/apache/kafka/common/Node.java 
 f4e4186c7602787e58e304a2f1c293a633114656 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-18 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 18, 2015, 3:56 p.m.)


Review request for kafka.


Bugs: 1928 and KAFKA-1928
https://issues.apache.org/jira/browse/1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description (updated)
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


removed reify and remaining from send/recieve API, per Jun. moved 
maybeCloseOldest() to Selector per Jay


added idString to node API, changed written to int in Send API


cleaning up MultiSend, added size() to Send interface


fixed some issues with multisend


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


fixed metric thingies


fixed response order bug


error handling for illegal selector state and fix metrics bug


optimized selection key lookup with identity hash


fix accidental change


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


addressing Jun's comments


removed connection-aging for clients


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
435fbb5116e80302eba11ed1d3069cb577dbdcbd 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 b2764df11afa7a99fce46d1ff48960d889032d14 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
8e336a3aa96c73f52beaeb56b931baf4b026cf21 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
1e943d621732889a1c005b243920dc32cea7af66 
  clients/src/main/java/org/apache/kafka/common/Node.java 
f4e4186c7602787e58e304a2f1c293a633114656 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd3a734fd429966d3e2016a2488dbbb19e5 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 
4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 
5d321a09e470166a1c33639cf0cab26a3bce98ec 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf390c7f148ffa8c5abc154c72cbf0829715c 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
5e3fab13e3c02eb351558ec973b949b3d1196085 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
8b278892883e63899b53e15efb9d8c926131e858 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b026e788b4e5479f3419805aa49ae889f3 
  clients/src/test/java/org/apache/kafka/test/MockSelector.java 
ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
  core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
1c3b3802ac221d570e7610458e50518b4499e7ed 
  

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-18 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review84158
---


Thanks for the latest patch. Looks good. Just a few minor comments below.

Also, could you make a pass to remove unused imports? I saw unused imports in 
at least BlockingChannel.


clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment135283

Do we need to turn this into a map? NetworkReceive already has the 
connection id.



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment135289

The comment should probably say per connection metrics.



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment135284

It doesn't seem that we need this method. Both callers can use 
transmission.id instead.



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment135286

Perhaps we should rename the method to maybeRegisterConnectionMetrics and 
node to connectionId.



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment135287

node = connectionId?



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment135288

node = connectionId?



clients/src/main/java/org/apache/kafka/common/network/Send.java
https://reviews.apache.org/r/33065/#comment135278

This needs to be long since NetworkSend always includes a size buffer, plus 
the payload buffer.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment135279

Yes, this is the same as IO-WAIT-RATIO. However, until we move all metrics 
off Coda Hale metrics, we want to expose key metrics in Kafka metrics to Coda 
Hale as well on the broker side for backward compatibility.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment135280

We are not registering for read here yet.



clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
https://reviews.apache.org/r/33065/#comment135292

Both should probably be defined as long.



clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
https://reviews.apache.org/r/33065/#comment135290

No longer needed since we are returning long.



clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
https://reviews.apache.org/r/33065/#comment135291

No need to cast.


- Jun Rao


On May 18, 2015, 3:56 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 18, 2015, 3:56 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 fix accidental change
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 addressing Jun's comments
 
 
 removed connection-aging for clients
 
 
 Diffs
 -
 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-18 Thread Gwen Shapira


 On May 15, 2015, 10:12 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java,
   lines 159-162
  https://reviews.apache.org/r/33065/diff/11/?file=961076#file961076line159
 
  We don't need this since the idle conenctions only need to be closed on 
  the server side.

I think Jay mentioned somewhere on this RB that he thinks that closing idle 
connections will also be useful on the client, which is why I added this 
configuration.


 On May 15, 2015, 10:12 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java,
   lines 172-175
  https://reviews.apache.org/r/33065/diff/11/?file=961081#file961081line172
 
  We don't need this since the idle conenctions only need to be closed on 
  the server side.

See above...


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review83959
---


On May 15, 2015, 7:30 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 15, 2015, 7:30 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 cleaning up MultiSend, added size() to Send interface
 
 
 fixed some issues with multisend
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 fixed metric thingies
 
 
 fixed response order bug
 
 
 error handling for illegal selector state and fix metrics bug
 
 
 optimized selection key lookup with identity hash
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 8e336a3aa96c73f52beaeb56b931baf4b026cf21 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 187d0004c8c46b6664ddaffecc6166d4b47351e5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 1e943d621732889a1c005b243920dc32cea7af66 
   clients/src/main/java/org/apache/kafka/common/Node.java 
 f4e4186c7602787e58e304a2f1c293a633114656 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-15 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 15, 2015, 7:30 a.m.)


Review request for kafka.


Bugs: 1928 and KAFKA-1928
https://issues.apache.org/jira/browse/1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description (updated)
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


removed reify and remaining from send/recieve API, per Jun. moved 
maybeCloseOldest() to Selector per Jay


added idString to node API, changed written to int in Send API


cleaning up MultiSend, added size() to Send interface


fixed some issues with multisend


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


fixed metric thingies


fixed response order bug


error handling for illegal selector state and fix metrics bug


optimized selection key lookup with identity hash


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
435fbb5116e80302eba11ed1d3069cb577dbdcbd 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
bdff518b732105823058e6182f445248b45dc388 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 e55ab11df4db0b0084f841a74cbcf819caf780d5 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
8e336a3aa96c73f52beaeb56b931baf4b026cf21 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
187d0004c8c46b6664ddaffecc6166d4b47351e5 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
1e943d621732889a1c005b243920dc32cea7af66 
  clients/src/main/java/org/apache/kafka/common/Node.java 
f4e4186c7602787e58e304a2f1c293a633114656 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd3a734fd429966d3e2016a2488dbbb19e5 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 
4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 
5d321a09e470166a1c33639cf0cab26a3bce98ec 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf390c7f148ffa8c5abc154c72cbf0829715c 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
5e3fab13e3c02eb351558ec973b949b3d1196085 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
8b278892883e63899b53e15efb9d8c926131e858 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b026e788b4e5479f3419805aa49ae889f3 
  clients/src/test/java/org/apache/kafka/test/MockSelector.java 
ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
  

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-15 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review83959
---


Thanks for the patch. There seem to be some compilation errors. Perhaps need to 
rebase.

:core:compileScala/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala:68:
 type mismatch;
 found   : kafka.network.RequestOrResponseSend
 required: kafka.network.Send
requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
  ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala:65:
 type mismatch;
 found   : kafka.network.RequestOrResponseSend
 required: kafka.network.Send
requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
  ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/FetchRequest.scala:152:
 type mismatch;
 found   : kafka.api.FetchResponseSend
 required: kafka.network.Send
requestChannel.sendResponse(new RequestChannel.Response(request, new 
FetchResponseSend(request.connectionId, errorResponse)))
 ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala:189:
 type mismatch;
 found   : kafka.network.RequestOrResponseSend
 required: kafka.network.Send
requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
  ^

/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetCommitRequest.scala:78:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetCommitRequest.scala:167:
 type mismatch;
 found   : kafka.network.RequestOrResponseSend
 required: kafka.network.Send
requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, commitResponse)))
  ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetFetchRequest.scala:99:
 type mismatch;
 found   : kafka.network.RequestOrResponseSend
 required: kafka.network.Send
requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
  ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetRequest.scala:121:
 type mismatch;
 found   : kafka.network.RequestOrResponseSend
 required: kafka.network.Send
requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
  ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/network/RequestChannel.scala:29:
 imported `Send' is permanently hidden by definition of trait Send in package 
network
import org.apache.kafka.common.network.Send


clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
https://reviews.apache.org/r/33065/#comment135068

We don't need this since the idle conenctions only need to be closed on the 
server side.



clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
https://reviews.apache.org/r/33065/#comment135072

We don't need this since the idle conenctions only need to be closed on the 
server side.



clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
https://reviews.apache.org/r/33065/#comment135073

Actually, with MultiSend, we will be sending a 4-byte size plus the 
payload, the sum of which could be a bit larger than max_int. So, I think we 
need to make writeTo and size return long instead in Send. Sorry for the 
incorrect suggestion earlier.



clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
https://reviews.apache.org/r/33065/#comment135074

To be consistent, no need to wrap single line statement in {}.



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment135059

It seems that we don't really need IdenityHashMap to optimize performance. 
In the following link, HashMap gives comparable performance as IdenityHashMap 
on String keys since String caches the hashcode.
http://java-performance.info/java-util-identityhashmap/



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment135056

We only need to close 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-14 Thread Gwen Shapira


 On May 6, 2015, 10:50 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 455
  https://reviews.apache.org/r/33065/diff/4/?file=947565#file947565line455
 
  This is just registering for reads, not for writes, right?

Maybe I misunderstood, but it looks like selector.send registers for writing?


 On May 6, 2015, 10:50 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, lines 
  311-315
  https://reviews.apache.org/r/33065/diff/4/?file=947532#file947532line311
 
  In this case, we should probably just propagate the exception and 
  potentially kill the caller, instead of continue, right?

Are you saying this due the the warning or the exception itself?

The warning is misplaced - we don't validate requests at this later. Illegal 
state exception will be due to our own bugs mostly - such as attempting to send 
data while another send is in progress. I think continuing here is fine?

I'm fixing the error message and adding code to handle invalid requests in 
SocketServer. The SocketServer exception handler will basically print an error, 
close the connection and continue.


 On May 6, 2015, 10:50 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, lines 367-373
  https://reviews.apache.org/r/33065/diff/4/?file=947565#file947565line367
 
  Currently, Selector exposes metrics per connection. While this works 
  fine for producer/consumer clients since the number of brokers is typically 
  small, I am concerned about having those on the server since there could be 
  10s of thousands of connections in a broker.
  
  Perhaps we can pass in a config to disable per node metric in Selector 
  when used in the broker.
  
  We probably should think through if there is any metrics in Selector 
  that we want to expose in Coda Hale metrics.

This makes sense. 
Actually, the current code has metrics per Processor. I think we can go with a 
single Metrics object for all processors and tag each processor. 
It will be nice to have a tag for protocol too, but I don't see how we can do 
it with each processor handling all protocols.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82625
---


On May 12, 2015, 9:58 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 12, 2015, 9:58 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-14 Thread Gwen Shapira


 On May 6, 2015, 10:50 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, lines 
  311-315
  https://reviews.apache.org/r/33065/diff/4/?file=947532#file947532line311
 
  In this case, we should probably just propagate the exception and 
  potentially kill the caller, instead of continue, right?
 
 Gwen Shapira wrote:
 Are you saying this due the the warning or the exception itself?
 
 The warning is misplaced - we don't validate requests at this later. 
 Illegal state exception will be due to our own bugs mostly - such as 
 attempting to send data while another send is in progress. I think continuing 
 here is fine?
 
 I'm fixing the error message and adding code to handle invalid requests 
 in SocketServer. The SocketServer exception handler will basically print an 
 error, close the connection and continue.
 
 Jun Rao wrote:
 If we get into an IllegalStateException, the state may not be fixable. 
 So, it's probably better to let the caller deal with it. On the server side, 
 we will probably just log an error and stop the processor thread.

Thats a good point - we don't know what caused us to get to an illegal state, 
and therefore don't know if they rest of the connections for the processor are 
OK or not.

However, in the current server code, we never close processor due to errors. We 
close connections, but processors stick around. This makes sense since we never 
re-open processors, so closing the processor permenantly reduces the server's 
capacity. 

I can move handling to the SocketServer (actually, the docs say that we throw 
this exception - so I have to move it), but I still think the right handling is 
to log an error and keep trying to process other connections.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82625
---


On May 14, 2015, 9:51 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 14, 2015, 9:51 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc2 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd52 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 8e336a3 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 187d000 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 1e943d6 
   clients/src/main/java/org/apache/kafka/common/Node.java f4e4186 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae82 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e1 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-14 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 14, 2015, 9:30 p.m.)


Review request for kafka.


Changes
---

Addressed Jun's comments


Bugs: 1928 and KAFKA-1928
https://issues.apache.org/jira/browse/1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


removed reify and remaining from send/recieve API, per Jun. moved 
maybeCloseOldest() to Selector per Jay


added idString to node API, changed written to int in Send API


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc2 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 936487b 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
bdff518 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 e55ab11 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd52 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
8e336a3 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
187d000 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
1e943d6 
  clients/src/main/java/org/apache/kafka/common/Node.java f4e4186 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae82 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e1 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 4e33078 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de058 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a0 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf39 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 5e3fab1 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 8b27889 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b 
  clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06 
  core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 1c3b380 
  core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala a3b1b78 
  core/src/main/scala/kafka/api/ControlledShutdownRequest.scala fe81635 
  core/src/main/scala/kafka/api/FetchRequest.scala b038c15 
  core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 431190a 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 317daed 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala fa8bd6a 
  core/src/main/scala/kafka/api/OffsetRequest.scala 3d483bc 
  core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 5e14987 
  core/src/main/scala/kafka/api/TopicMetadataRequest.scala 363bae0 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 69f0397 
  core/src/main/scala/kafka/client/ClientUtils.scala 62394c0 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala aa8d940 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala 6cf13f0 
  

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-14 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review83841
---



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment134928

Actually, thinking about this a bit more, we can't unmute here. Instead, we 
need to wait until the send completes before we can unmute. Otherwise, two 
responses for the same connection may be out of order. We can call 
selector.completedSends() to get all completed sends and then unmute.


- Jun Rao


On May 12, 2015, 9:58 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 12, 2015, 9:58 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 42b12928781463b56fc4a45d96bb4da2745b6d95 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 187d0004c8c46b6664ddaffecc6166d4b47351e5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   clients/src/main/java/org/apache/kafka/common/Node.java 
 f4e4186c7602787e58e304a2f1c293a633114656 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
 27cbf390c7f148ffa8c5abc154c72cbf0829715c 
   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
 PRE-CREATION 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-14 Thread Jun Rao


 On May 6, 2015, 10:50 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, lines 
  311-315
  https://reviews.apache.org/r/33065/diff/4/?file=947532#file947532line311
 
  In this case, we should probably just propagate the exception and 
  potentially kill the caller, instead of continue, right?
 
 Gwen Shapira wrote:
 Are you saying this due the the warning or the exception itself?
 
 The warning is misplaced - we don't validate requests at this later. 
 Illegal state exception will be due to our own bugs mostly - such as 
 attempting to send data while another send is in progress. I think continuing 
 here is fine?
 
 I'm fixing the error message and adding code to handle invalid requests 
 in SocketServer. The SocketServer exception handler will basically print an 
 error, close the connection and continue.

If we get into an IllegalStateException, the state may not be fixable. So, it's 
probably better to let the caller deal with it. On the server side, we will 
probably just log an error and stop the processor thread.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82625
---


On May 12, 2015, 9:58 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 12, 2015, 9:58 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 42b12928781463b56fc4a45d96bb4da2745b6d95 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 187d0004c8c46b6664ddaffecc6166d4b47351e5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   clients/src/main/java/org/apache/kafka/common/Node.java 
 f4e4186c7602787e58e304a2f1c293a633114656 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-14 Thread Gwen Shapira


 On May 6, 2015, 10:50 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, lines 
  311-315
  https://reviews.apache.org/r/33065/diff/4/?file=947532#file947532line311
 
  In this case, we should probably just propagate the exception and 
  potentially kill the caller, instead of continue, right?
 
 Gwen Shapira wrote:
 Are you saying this due the the warning or the exception itself?
 
 The warning is misplaced - we don't validate requests at this later. 
 Illegal state exception will be due to our own bugs mostly - such as 
 attempting to send data while another send is in progress. I think continuing 
 here is fine?
 
 I'm fixing the error message and adding code to handle invalid requests 
 in SocketServer. The SocketServer exception handler will basically print an 
 error, close the connection and continue.
 
 Jun Rao wrote:
 If we get into an IllegalStateException, the state may not be fixable. 
 So, it's probably better to let the caller deal with it. On the server side, 
 we will probably just log an error and stop the processor thread.
 
 Gwen Shapira wrote:
 Thats a good point - we don't know what caused us to get to an illegal 
 state, and therefore don't know if they rest of the connections for the 
 processor are OK or not.
 
 However, in the current server code, we never close processor due to 
 errors. We close connections, but processors stick around. This makes sense 
 since we never re-open processors, so closing the processor permenantly 
 reduces the server's capacity. 
 
 I can move handling to the SocketServer (actually, the docs say that we 
 throw this exception - so I have to move it), but I still think the right 
 handling is to log an error and keep trying to process other connections.

Never mind. Even though I dislike losing a processor, we have no choice:
There's no connection to close, since the illegal state may be due to 
connection not existing in first place.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82625
---


On May 14, 2015, 9:51 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 14, 2015, 9:51 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc2 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd52 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 8e336a3 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 187d000 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 1e943d6 
   clients/src/main/java/org/apache/kafka/common/Node.java f4e4186 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-14 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 14, 2015, 9:51 p.m.)


Review request for kafka.


Changes
---

Addressed Jun's comments


Bugs: 1928 and KAFKA-1928
https://issues.apache.org/jira/browse/1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


removed reify and remaining from send/recieve API, per Jun. moved 
maybeCloseOldest() to Selector per Jay


added idString to node API, changed written to int in Send API


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc2 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 936487b 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
bdff518 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 e55ab11 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd52 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
8e336a3 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
187d000 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
1e943d6 
  clients/src/main/java/org/apache/kafka/common/Node.java f4e4186 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae82 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e1 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 4e33078 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de058 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a0 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf39 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 5e3fab1 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 8b27889 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b 
  clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06 
  core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 1c3b380 
  core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala a3b1b78 
  core/src/main/scala/kafka/api/ControlledShutdownRequest.scala fe81635 
  core/src/main/scala/kafka/api/FetchRequest.scala b038c15 
  core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 431190a 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 317daed 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala fa8bd6a 
  core/src/main/scala/kafka/api/OffsetRequest.scala 3d483bc 
  core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 5e14987 
  core/src/main/scala/kafka/api/TopicMetadataRequest.scala 363bae0 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 69f0397 
  core/src/main/scala/kafka/client/ClientUtils.scala 62394c0 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala aa8d940 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala 6cf13f0 
  

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-14 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 15, 2015, 12:07 a.m.)


Review request for kafka.


Changes
---

Fixed bug in metrics update that caused hangs, and corrected illegal state 
handling per Jun's suggestion.


Bugs: 1928 and KAFKA-1928
https://issues.apache.org/jira/browse/1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


removed reify and remaining from send/recieve API, per Jun. moved 
maybeCloseOldest() to Selector per Jay


added idString to node API, changed written to int in Send API


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc2 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 936487b 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
bdff518 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 e55ab11 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd52 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
8e336a3 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
187d000 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
1e943d6 
  clients/src/main/java/org/apache/kafka/common/Node.java f4e4186 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae82 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e1 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 4e33078 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de058 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a0 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf39 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 5e3fab1 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 8b27889 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b 
  clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06 
  core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 1c3b380 
  core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala a3b1b78 
  core/src/main/scala/kafka/api/ControlledShutdownRequest.scala fe81635 
  core/src/main/scala/kafka/api/FetchRequest.scala b038c15 
  core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 431190a 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 317daed 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala fa8bd6a 
  core/src/main/scala/kafka/api/OffsetRequest.scala 3d483bc 
  core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 5e14987 
  core/src/main/scala/kafka/api/TopicMetadataRequest.scala 363bae0 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 69f0397 
  core/src/main/scala/kafka/client/ClientUtils.scala 62394c0 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala aa8d940 
  

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-13 Thread Gwen Shapira


 On May 6, 2015, 10:50 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/Selector.java, lines 
  198-199
  https://reviews.apache.org/r/33065/diff/4/?file=947532#file947532line198
 
  Could we just iterate the keySet directly instead of making a copy 
  first?

We can't. I want to avoid unnecessary copies, but couldn't find a way out of 
this one.
Close(key) actually removes elements from the keySet. The iterator will throw 
an exception regarding mutating iterator if another function will remove items 
from the set during the iteration, so we need to iterate over a copy.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82625
---


On May 12, 2015, 9:58 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 12, 2015, 9:58 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 added idString to node API, changed written to int in Send API
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 42b12928781463b56fc4a45d96bb4da2745b6d95 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 187d0004c8c46b6664ddaffecc6166d4b47351e5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   clients/src/main/java/org/apache/kafka/common/Node.java 
 f4e4186c7602787e58e304a2f1c293a633114656 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-12 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 12, 2015, 9 a.m.)


Review request for kafka.


Summary (updated)
-

Patch for KAFKA-1928


Bugs: 1928 and KAFKA-1928
https://issues.apache.org/jira/browse/1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


removed reify and remaining from send/recieve API, per Jun. moved 
maybeCloseOldest() to Selector per Jay


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
bdff518b732105823058e6182f445248b45dc388 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 e55ab11df4db0b0084f841a74cbcf819caf780d5 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
42b12928781463b56fc4a45d96bb4da2745b6d95 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
187d0004c8c46b6664ddaffecc6166d4b47351e5 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b2db91ca14bbd17fef5ce85839679144fff3f689 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd3a734fd429966d3e2016a2488dbbb19e5 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 
4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 
5d321a09e470166a1c33639cf0cab26a3bce98ec 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf390c7f148ffa8c5abc154c72cbf0829715c 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
5e3fab13e3c02eb351558ec973b949b3d1196085 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
8b278892883e63899b53e15efb9d8c926131e858 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b026e788b4e5479f3419805aa49ae889f3 
  clients/src/test/java/org/apache/kafka/test/MockSelector.java 
ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
  core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
1c3b3802ac221d570e7610458e50518b4499e7ed 
  core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
a3b1b78adb760eaeb029466b54f335a29caf3b0f 
  core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
fe81635c864cec03ca1d4681c9c47c3fc4f975ee 
  core/src/main/scala/kafka/api/FetchRequest.scala 
b038c15186c0cbcc65b59479324052498361b717 
  core/src/main/scala/kafka/api/FetchResponse.scala 
75aaf57fb76ec01660d93701a57ae953d877d81c 
  

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-12 Thread Gwen Shapira


 On May 6, 2015, 10:50 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java, 
  lines 55-64
  https://reviews.apache.org/r/33065/diff/4/?file=947526#file947526line55
 
  Since we always use a 4 byte size, perhaps we should make both 
  remaining() and writeTo() return int?

Ooh, didn't notice that. Makes sense. Thanks!


 On May 6, 2015, 10:50 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java, line 
  45
  https://reviews.apache.org/r/33065/diff/4/?file=947527#file947527line45
 
  Not sure why we need to pass in expectedBytesToWrite. This can be 
  computed from the sends.

It can't. Sends (especially the streaming fetch responses) don't implement 
size().
If you look at how expectedBytesToWrite is calculated, it is size of object 
minus size of headers. We probably don't need this logic in the network 
layer, where it can be part of FetchResponseSend.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82625
---


On May 12, 2015, 9 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 12, 2015, 9 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: 1928 and KAFKA-1928
 https://issues.apache.org/jira/browse/1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 removed reify and remaining from send/recieve API, per Jun. moved 
 maybeCloseOldest() to Selector per Jay
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 bdff518b732105823058e6182f445248b45dc388 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 42b12928781463b56fc4a45d96bb4da2745b6d95 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 187d0004c8c46b6664ddaffecc6166d4b47351e5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-12 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 12, 2015, 9:58 a.m.)


Review request for kafka.


Bugs: 1928 and KAFKA-1928
https://issues.apache.org/jira/browse/1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description (updated)
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


removed reify and remaining from send/recieve API, per Jun. moved 
maybeCloseOldest() to Selector per Jay


added idString to node API, changed written to int in Send API


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
bdff518b732105823058e6182f445248b45dc388 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 e55ab11df4db0b0084f841a74cbcf819caf780d5 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
42b12928781463b56fc4a45d96bb4da2745b6d95 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
187d0004c8c46b6664ddaffecc6166d4b47351e5 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b2db91ca14bbd17fef5ce85839679144fff3f689 
  clients/src/main/java/org/apache/kafka/common/Node.java 
f4e4186c7602787e58e304a2f1c293a633114656 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd3a734fd429966d3e2016a2488dbbb19e5 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 
4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 
5d321a09e470166a1c33639cf0cab26a3bce98ec 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf390c7f148ffa8c5abc154c72cbf0829715c 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
5e3fab13e3c02eb351558ec973b949b3d1196085 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
8b278892883e63899b53e15efb9d8c926131e858 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b026e788b4e5479f3419805aa49ae889f3 
  clients/src/test/java/org/apache/kafka/test/MockSelector.java 
ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
  core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
1c3b3802ac221d570e7610458e50518b4499e7ed 
  core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
a3b1b78adb760eaeb029466b54f335a29caf3b0f 
  core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
fe81635c864cec03ca1d4681c9c47c3fc4f975ee 
  core/src/main/scala/kafka/api/FetchRequest.scala 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-08 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82347
---



clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
https://reviews.apache.org/r/33065/#comment133063

Not sure why we are checking if the node is non-negative. But the semantic 
meaning of the code here has changed.


- Jiangjie Qin


On May 1, 2015, 10:45 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 1, 2015, 10:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
 27cbf390c7f148ffa8c5abc154c72cbf0829715c 
   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
 PRE-CREATION 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 5e3fab13e3c02eb351558ec973b949b3d1196085 
   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
 8b278892883e63899b53e15efb9d8c926131e858 
   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
 d5b306b026e788b4e5479f3419805aa49ae889f3 
   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
 ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
 1c3b3802ac221d570e7610458e50518b4499e7ed 
   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
 a3b1b78adb760eaeb029466b54f335a29caf3b0f 
   core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
 fe81635c864cec03ca1d4681c9c47c3fc4f975ee 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 b038c15186c0cbcc65b59479324052498361b717 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 75aaf57fb76ec01660d93701a57ae953d877d81c 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-06 Thread Jun Rao


 On April 18, 2015, 12:49 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/api/FetchResponse.scala, lines 66-72
  https://reviews.apache.org/r/33065/diff/1/?file=922627#file922627line66
 
  I am wondering if we need both completed() and remaining() in Send. It 
  seems that one of the two is enough for our usage.
  
  Also, not sure how useful reify() is. Currently, it's not used anywhere.
 
 Gwen Shapira wrote:
 reification is a very central concept in Kafka's The Trial :)
 
 I agree that reify() and remaining() can be removed without any pain. 
 Since it is a public API, do we need to deprecate them first?

Yes, we should we able to remove them since the api that's public is in 
kafka.javaapi.FetchResponse.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review80557
---


On May 1, 2015, 10:45 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 1, 2015, 10:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
 27cbf390c7f148ffa8c5abc154c72cbf0829715c 
   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
 PRE-CREATION 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 5e3fab13e3c02eb351558ec973b949b3d1196085 
   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
 8b278892883e63899b53e15efb9d8c926131e858 
   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
 d5b306b026e788b4e5479f3419805aa49ae889f3 
   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
 ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-06 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82625
---


Thanks for the patch. Looks great overall! Some comments below.

Also, there seems to be compilation error when running tests.


clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
https://reviews.apache.org/r/33065/#comment133525

Instead of doing the id to string conversion every time, perhaps we can 
compute the string of the id once in Node and expose it through an idString() 
method?



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
https://reviews.apache.org/r/33065/#comment133526

Perhaps changing nodeId to node.



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
https://reviews.apache.org/r/33065/#comment133527

Perhaps changing id to node?



clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
https://reviews.apache.org/r/33065/#comment133529

Since we always use a 4 byte size, perhaps we should make both remaining() 
and writeTo() return int?



clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
https://reviews.apache.org/r/33065/#comment133564

Not sure why we need to pass in expectedBytesToWrite. This can be computed 
from the sends.



clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
https://reviews.apache.org/r/33065/#comment133532

Instead of moving back the cursor in the iterator, would it be simpler to 
maintain a current Send that's being iterated on?



clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
https://reviews.apache.org/r/33065/#comment133534

request size probably should be renamed to receive size since it can be for 
either request or response.



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133512

There is a followup patch in KAFKA-1282 that we haven't incorporated. 
Bascially, the linked hash map needs to be in access order. Since we are moving 
code around, could you incorporate the change here?



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133537

Instead of -1, perhaps we can reuse NetworkReceive.UNLIMITED?



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133541

Could we just iterate the keySet directly instead of making a copy first?



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133543

It's reasonable to have a size for Send. The server size only records sent 
bytes, instead of messages.



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133544

In this case, we should probably just propagate the exception and 
potentially kill the caller, instead of continue, right?



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133548

Perhaps we could pass in connectionMaxIdleNanos to the constructor of 
Selector. Then this method can be private. For clients, we can default to max 
long so that they could close idle connections since the broker is doing that. 
The broker can pass in the connectionMaxIdleNanos from config.



core/src/main/scala/kafka/network/RequestChannel.scala
https://reviews.apache.org/r/33065/#comment133506

We probably don't need remoteAddress any more since that's part of 
connectionId now.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment133395

Could we define both methods as override?



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment133567

Currently, Selector exposes metrics per connection. While this works fine 
for producer/consumer clients since the number of brokers is typically small, I 
am concerned about having those on the server since there could be 10s of 
thousands of connections in a broker.

Perhaps we can pass in a config to disable per node metric in Selector when 
used in the broker.

We probably should think through if there is any metrics in Selector that 
we want to expose in Coda Hale metrics.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment133500

It seems that we don't need startSelectTime. We will need to expose the 
io-wait-ratio metric in selector to networkProcessor.IdlePercent. This can be 
done in a followup jira though if the patch is too big.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment133495

No need for return value.



core/src/main/scala/kafka/network/SocketServer.scala

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-01 Thread Gwen Shapira


 On May 1, 2015, 9:25 p.m., Jay Kreps wrote:
  core/src/main/scala/kafka/network/SocketServer.scala, line 382
  https://reviews.apache.org/r/33065/diff/3/?file=946988#file946988line382
 
  Beautiful! So much deleted code!

Haha :) Yeah, I love how minimal SocketServer became.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82281
---


On May 1, 2015, 10:45 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 1, 2015, 10:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
 27cbf390c7f148ffa8c5abc154c72cbf0829715c 
   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
 PRE-CREATION 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 5e3fab13e3c02eb351558ec973b949b3d1196085 
   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
 8b278892883e63899b53e15efb9d8c926131e858 
   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
 d5b306b026e788b4e5479f3419805aa49ae889f3 
   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
 ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
 1c3b3802ac221d570e7610458e50518b4499e7ed 
   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
 a3b1b78adb760eaeb029466b54f335a29caf3b0f 
   core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
 fe81635c864cec03ca1d4681c9c47c3fc4f975ee 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 b038c15186c0cbcc65b59479324052498361b717 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 75aaf57fb76ec01660d93701a57ae953d877d81c 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-01 Thread Gwen Shapira


 On April 18, 2015, 12:49 a.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java, 
  line 55
  https://reviews.apache.org/r/33065/diff/1/?file=922619#file922619line55
 
  Do you know why the return is changed from int to long?

writeTo() and remaining() should agree on type, since remaining is calculated 
from sendSize minus a sum of writeTo() calls.

We can either change writeTo() to int or remaining() to long. 
I believe either will work, and I chose to change remaining().


 On April 18, 2015, 12:49 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/api/FetchResponse.scala, lines 66-72
  https://reviews.apache.org/r/33065/diff/1/?file=922627#file922627line66
 
  I am wondering if we need both completed() and remaining() in Send. It 
  seems that one of the two is enough for our usage.
  
  Also, not sure how useful reify() is. Currently, it's not used anywhere.

reification is a very central concept in Kafka's The Trial :)

I agree that reify() and remaining() can be removed without any pain. Since it 
is a public API, do we need to deprecate them first?


 On April 18, 2015, 12:49 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/api/FetchResponse.scala, line 70
  https://reviews.apache.org/r/33065/diff/1/?file=922627#file922627line70
 
  This probably should be def reify() {}?

errr... why? is it a coding standard issue? I think the above is correct code 
for reify as a function that returns null (which is what Send API specified if 
we don't have a buffer to return). Anyway, this is irrelevant if we are 
removing reify.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review80557
---


On May 1, 2015, 12:48 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 1, 2015, 12:48 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-01 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 1, 2015, 10:45 p.m.)


Review request for kafka.


Bugs: KAFKA-1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description (updated)
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
as well


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 e55ab11df4db0b0084f841a74cbcf819caf780d5 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b2db91ca14bbd17fef5ce85839679144fff3f689 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd3a734fd429966d3e2016a2488dbbb19e5 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 
4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 
5d321a09e470166a1c33639cf0cab26a3bce98ec 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf390c7f148ffa8c5abc154c72cbf0829715c 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
5e3fab13e3c02eb351558ec973b949b3d1196085 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
8b278892883e63899b53e15efb9d8c926131e858 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b026e788b4e5479f3419805aa49ae889f3 
  clients/src/test/java/org/apache/kafka/test/MockSelector.java 
ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
  core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
1c3b3802ac221d570e7610458e50518b4499e7ed 
  core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
a3b1b78adb760eaeb029466b54f335a29caf3b0f 
  core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
fe81635c864cec03ca1d4681c9c47c3fc4f975ee 
  core/src/main/scala/kafka/api/FetchRequest.scala 
b038c15186c0cbcc65b59479324052498361b717 
  core/src/main/scala/kafka/api/FetchResponse.scala 
75aaf57fb76ec01660d93701a57ae953d877d81c 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
431190ab94afc4acfc14348a1fc720e17c071cea 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
cf8e6acc426aef6eb19d862bf6a108a5fc37907a 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
67811a752a470bf9bdbc8c5419e8d6e20a006169 
  core/src/main/scala/kafka/api/OffsetRequest.scala 
3d483bc7518ad76f9548772522751afb4d046b78 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
570b2da1d865086f9830aa919a49063abbbe574d 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 
5e14987c990fe561c01dac2909f5ed21a506e038 
  core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
363bae01752318f3849242b97a6619747697c1d9 
  

Re: Review Request 33065: Patch for KAFKA-1928

2015-05-01 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82281
---


I did a high level pass. On the whole I think this is awesome! I agree we need 
to rationalize the metrics now that we have a bunch of instrumentation at the 
selector level we need a lot less at the socket server level.


clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133003

Does this need to be public? Instead could we have maxIdleTime be a 
configurable param and have this call happen from within select?

We didn't have this functionality in the client before, but actually the 
client also needs connection LRU so I think doing it on both sides is good.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment133004

Beautiful! So much deleted code!


- Jay Kreps


On May 1, 2015, 12:48 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 1, 2015, 12:48 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
 27cbf390c7f148ffa8c5abc154c72cbf0829715c 
   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
 PRE-CREATION 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 5e3fab13e3c02eb351558ec973b949b3d1196085 
   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
 8b278892883e63899b53e15efb9d8c926131e858 
   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
 d5b306b026e788b4e5479f3419805aa49ae889f3 
   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
 ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
 1c3b3802ac221d570e7610458e50518b4499e7ed 
   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
 a3b1b78adb760eaeb029466b54f335a29caf3b0f 
   

Re: Review Request 33065: Patch for KAFKA-1928

2015-04-30 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/
---

(Updated May 1, 2015, 12:48 a.m.)


Review request for kafka.


Bugs: KAFKA-1928
https://issues.apache.org/jira/browse/KAFKA-1928


Repository: kafka


Description (updated)
---

first pass on replacing Send


implement maxSize and improved docs


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Conflicts:
core/src/main/scala/kafka/network/RequestChannel.scala

moved selector out of abstract thread


mid-way through putting selector in SocketServer


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2

Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass.

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1928-v2


renamed requestKey to connectionId to reflect new use and changed type from Any 
to String


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
936487b16e7ac566f8bdcd39a7240ceb619fd30e 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
1311f85847b022efec8cb05c450bb18231db6979 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 e55ab11df4db0b0084f841a74cbcf819caf780d5 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b2db91ca14bbd17fef5ce85839679144fff3f689 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
129ae827bccbd982ad93d56e46c6f5c46f147fe0 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
c8213e156ec9c9af49ee09f5238492318516aaa3 
  clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
fc0d168324aaebb97065b0aafbd547a1994d76a7 
  clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
68327cd3a734fd429966d3e2016a2488dbbb19e5 
  clients/src/main/java/org/apache/kafka/common/network/Receive.java 
4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
  clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 
5d321a09e470166a1c33639cf0cab26a3bce98ec 
  clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
27cbf390c7f148ffa8c5abc154c72cbf0829715c 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
5e3fab13e3c02eb351558ec973b949b3d1196085 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
8b278892883e63899b53e15efb9d8c926131e858 
  clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
d5b306b026e788b4e5479f3419805aa49ae889f3 
  clients/src/test/java/org/apache/kafka/test/MockSelector.java 
ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
  core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
1c3b3802ac221d570e7610458e50518b4499e7ed 
  core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
a3b1b78adb760eaeb029466b54f335a29caf3b0f 
  core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
fe81635c864cec03ca1d4681c9c47c3fc4f975ee 
  core/src/main/scala/kafka/api/FetchRequest.scala 
b038c15186c0cbcc65b59479324052498361b717 
  core/src/main/scala/kafka/api/FetchResponse.scala 
75aaf57fb76ec01660d93701a57ae953d877d81c 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
431190ab94afc4acfc14348a1fc720e17c071cea 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
cf8e6acc426aef6eb19d862bf6a108a5fc37907a 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
67811a752a470bf9bdbc8c5419e8d6e20a006169 
  core/src/main/scala/kafka/api/OffsetRequest.scala 
3d483bc7518ad76f9548772522751afb4d046b78 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
570b2da1d865086f9830aa919a49063abbbe574d 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 
5e14987c990fe561c01dac2909f5ed21a506e038 
  core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
363bae01752318f3849242b97a6619747697c1d9 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
69f0397b187a737b4ddf50e390d3c2f418ce6b5d 
  core/src/main/scala/kafka/client/ClientUtils.scala 

Re: Review Request 33065: Patch for KAFKA-1928

2015-04-20 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review80761
---



clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
https://reviews.apache.org/r/33065/#comment130886

So I think you are saying this will move to a String, right? I think that 
makes sense...


This looks great to me (yay deleting code!).

One potential gotcha when converting the socket server on the server to use 
Selector is the differing assumptions on the read interest and write interest 
state transitions. The clients, being clients, are interested in writing when 
they have outstanding stuff to send, and are always interested in reading. 
However the server only allows a single request per connection to be processed 
at any given time, and this is a particularly important to guarantee ordering 
since there are multiple processing threads. I think this can be implemented in 
the socket server by just calling mute() after a completed request is read, but 
I just wanted to point out this difference as it is a little subtle.

- Jay Kreps


On April 10, 2015, 4:58 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated April 10, 2015, 4:58 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
 PRE-CREATION 
   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
 1c3b3802ac221d570e7610458e50518b4499e7ed 
   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
 a3b1b78adb760eaeb029466b54f335a29caf3b0f 
   core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
 5be393ab8272a49437b5057ed098ccdc42f352e5 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 75aaf57fb76ec01660d93701a57ae953d877d81c 
   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
 2fad585f126699ba8d26c901a41bcf6b8198bf62 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 cf8e6acc426aef6eb19d862bf6a108a5fc37907a 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 67811a752a470bf9bdbc8c5419e8d6e20a006169 
   core/src/main/scala/kafka/api/OffsetRequest.scala 
 3d483bc7518ad76f9548772522751afb4d046b78 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 570b2da1d865086f9830aa919a49063abbbe574d 
   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
 5e14987c990fe561c01dac2909f5ed21a506e038 
   core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
 7dca09ce637a40e125de05703dc42e8b611971ac 
   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
 69f0397b187a737b4ddf50e390d3c2f418ce6b5d 
   core/src/main/scala/kafka/client/ClientUtils.scala 
 b66424b230463df6641a848b99bb73312ea66e33 
   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
 cbef84ac76e62768981f74e71d451f2bda995275 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 e250b94626c62b3b7f33ee4180ca3ab69a8821d6 
   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
 97acdb23f6e95554c3e0357aa112eddfc875efbc 
   core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala 
 b0b7be14d494ae8c87f4443b52db69d273c20316 
   core/src/main/scala/kafka/network/BlockingChannel.scala 
 6e2a38eee8e568f9032f95c75fa5899e9715b433 
   core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala 
 c0d77261353478232ab85591c182be57845b3f13 
   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
 b95b73b71252932867c60192b3d5b91efe99e122 
   core/src/main/scala/kafka/network/ByteBufferSend.scala 
 af30042a4c713418ecd83b6c6c17dfcbdc101c62 
   core/src/main/scala/kafka/network/Handler.scala 
 a0300336b8cb5a2d5be68b7b48bdbe045bf99324 
   core/src/main/scala/kafka/network/RequestChannel.scala 
 1d9c57b0b5a0ad31e4f3d7562f0266af83cc9024 
   core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION 
   core/src/main/scala/kafka/network/SocketServer.scala 
 

Re: Review Request 33065: Patch for KAFKA-1928

2015-04-17 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review80557
---


Took a quick look. Overall, looks good. A few comments below.


clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
https://reviews.apache.org/r/33065/#comment130596

Do you know why the return is changed from int to long?



core/src/main/scala/kafka/api/FetchResponse.scala
https://reviews.apache.org/r/33065/#comment130594

I am wondering if we need both completed() and remaining() in Send. It 
seems that one of the two is enough for our usage.

Also, not sure how useful reify() is. Currently, it's not used anywhere.



core/src/main/scala/kafka/api/FetchResponse.scala
https://reviews.apache.org/r/33065/#comment130592

This probably should be def reify() {}?



core/src/main/scala/kafka/network/Transmission.scala
https://reviews.apache.org/r/33065/#comment130609

Probably this can be moved to the client side?



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/33065/#comment130610

Are those needed since we are importing kafka.api._?


- Jun Rao


On April 10, 2015, 4:58 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated April 10, 2015, 4:58 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
 PRE-CREATION 
   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
 1c3b3802ac221d570e7610458e50518b4499e7ed 
   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
 a3b1b78adb760eaeb029466b54f335a29caf3b0f 
   core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
 5be393ab8272a49437b5057ed098ccdc42f352e5 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 75aaf57fb76ec01660d93701a57ae953d877d81c 
   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
 2fad585f126699ba8d26c901a41bcf6b8198bf62 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 cf8e6acc426aef6eb19d862bf6a108a5fc37907a 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 67811a752a470bf9bdbc8c5419e8d6e20a006169 
   core/src/main/scala/kafka/api/OffsetRequest.scala 
 3d483bc7518ad76f9548772522751afb4d046b78 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 570b2da1d865086f9830aa919a49063abbbe574d 
   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
 5e14987c990fe561c01dac2909f5ed21a506e038 
   core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
 7dca09ce637a40e125de05703dc42e8b611971ac 
   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
 69f0397b187a737b4ddf50e390d3c2f418ce6b5d 
   core/src/main/scala/kafka/client/ClientUtils.scala 
 b66424b230463df6641a848b99bb73312ea66e33 
   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
 cbef84ac76e62768981f74e71d451f2bda995275 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 e250b94626c62b3b7f33ee4180ca3ab69a8821d6 
   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
 97acdb23f6e95554c3e0357aa112eddfc875efbc 
   core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala 
 b0b7be14d494ae8c87f4443b52db69d273c20316 
   core/src/main/scala/kafka/network/BlockingChannel.scala 
 6e2a38eee8e568f9032f95c75fa5899e9715b433 
   core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala 
 c0d77261353478232ab85591c182be57845b3f13 
   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
 b95b73b71252932867c60192b3d5b91efe99e122 
   core/src/main/scala/kafka/network/ByteBufferSend.scala 
 af30042a4c713418ecd83b6c6c17dfcbdc101c62 
   core/src/main/scala/kafka/network/Handler.scala 
 a0300336b8cb5a2d5be68b7b48bdbe045bf99324 
   core/src/main/scala/kafka/network/RequestChannel.scala 
 1d9c57b0b5a0ad31e4f3d7562f0266af83cc9024 
   core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION