Re: Review Request 26885: Patch for KAFKA-1642

2014-10-27 Thread Ewen Cheslack-Postava


 On Oct. 27, 2014, 12:13 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 122
  https://reviews.apache.org/r/26885/diff/3/?file=731636#file731636line122
 
  The comments When connecting or connected, this handles slow/stalled 
  connections here are a bit misleading: after checking the code I realize 
  connectionDelay is only triggered to detemine the delay in milis that we 
  can re-check connectivity for node that is not connected, and hence if the 
  node is connected again while we are determining its delay, we just set it 
  to MAX.
  
  Instead of making it general to the KafkaClient interface, shall we 
  just add this to the code block of line 155?

It gets triggered any time NetworkClient.ready returns false for a node. The 
obvious case is that it will return not ready when disconnected, but it also 
does so when connecting or when connected but inFlightRequests.canSendMore() 
returns false (thus the mention of slow/stalled connections. The important 
thing is that the value returned *is* MAX_VALUE in those latter cases because 
neither one will be resolved by polling -- they both require an external event 
(connection established/failed or outstanding request receives a response) 
which should wake up the event loop when there's something to do. That keeps us 
from polling unnecessarily. Previously there were conditions in which 
connections in these states could trigger busy waiting of the poll loop.

I don't think we can get the same effect just inlining the code because it uses 
state that's only available through ClusterConnectionStates, which is private 
to NetworkClient. The KafkaClient only exposes the higher level concept of 
ready.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/#review58575
---


On Oct. 23, 2014, 11:19 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26885/
 ---
 
 (Updated Oct. 23, 2014, 11:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1642
 https://issues.apache.org/jira/browse/KAFKA-1642
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Fixes two issues with the computation of ready nodes and poll timeouts in
 Sender/RecordAccumulator:
 
 1. The timeout was computed incorrectly because it took into account all 
 nodes,
 even if they had data to send such that their timeout would be 0. However, 
 nodes
 were then filtered based on whether it was possible to send (i.e. their
 connection was still good) which could result in nothing to send and a 0
 timeout, resulting in busy looping. Instead, the timeout needs to be computed
 only using data that cannot be immediately sent, i.e. where the timeout will 
 be
 greater than 0. This timeout is only used if, after filtering by whether
 connections are ready for sending, there is no data to be sent. Other events 
 can
 wake the thread up earlier, e.g. a client reconnects and becomes ready again.
 
 2. One of the conditions indicating whether data is sendable is whether a
 timeout has expired -- either the linger time or the retry backoff. This
 condition wasn't accounting for both cases properly, always using the linger
 time. This means the retry backoff was probably not being respected.
 
 KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
 none can send data because they are in a connection backoff period.
 
 
 Addressing Jun's comments.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 d304660f29246e9600efe3ddb28cfcc2b074bed3 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 29658d4a15f112dc0af5ce517eaab93e6f00134b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  c5d470011d334318d5ee801021aadd0c000974a6 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 aae8d4a1e98279470587d397cc779a9baf6fee6c 
   
 clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
  0762b35abba0551f23047348c5893bb8c9acff14 
 
 Diff: https://reviews.apache.org/r/26885/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




Re: Review Request 26885: Patch for KAFKA-1642

2014-10-26 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/#review58575
---

Ship it!


LGTM, with one minor comment below.


clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
https://reviews.apache.org/r/26885/#comment99633

The comments When connecting or connected, this handles slow/stalled 
connections here are a bit misleading: after checking the code I realize 
connectionDelay is only triggered to detemine the delay in milis that we can 
re-check connectivity for node that is not connected, and hence if the node is 
connected again while we are determining its delay, we just set it to MAX.

Instead of making it general to the KafkaClient interface, shall we just 
add this to the code block of line 155?


- Guozhang Wang


On Oct. 23, 2014, 11:19 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26885/
 ---
 
 (Updated Oct. 23, 2014, 11:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1642
 https://issues.apache.org/jira/browse/KAFKA-1642
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Fixes two issues with the computation of ready nodes and poll timeouts in
 Sender/RecordAccumulator:
 
 1. The timeout was computed incorrectly because it took into account all 
 nodes,
 even if they had data to send such that their timeout would be 0. However, 
 nodes
 were then filtered based on whether it was possible to send (i.e. their
 connection was still good) which could result in nothing to send and a 0
 timeout, resulting in busy looping. Instead, the timeout needs to be computed
 only using data that cannot be immediately sent, i.e. where the timeout will 
 be
 greater than 0. This timeout is only used if, after filtering by whether
 connections are ready for sending, there is no data to be sent. Other events 
 can
 wake the thread up earlier, e.g. a client reconnects and becomes ready again.
 
 2. One of the conditions indicating whether data is sendable is whether a
 timeout has expired -- either the linger time or the retry backoff. This
 condition wasn't accounting for both cases properly, always using the linger
 time. This means the retry backoff was probably not being respected.
 
 KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
 none can send data because they are in a connection backoff period.
 
 
 Addressing Jun's comments.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 d304660f29246e9600efe3ddb28cfcc2b074bed3 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 29658d4a15f112dc0af5ce517eaab93e6f00134b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  c5d470011d334318d5ee801021aadd0c000974a6 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 aae8d4a1e98279470587d397cc779a9baf6fee6c 
   
 clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
  0762b35abba0551f23047348c5893bb8c9acff14 
 
 Diff: https://reviews.apache.org/r/26885/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




Re: Review Request 26885: Patch for KAFKA-1642

2014-10-23 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/#review57513
---


Thanks for the patch. Looks good to me. Some minor comments below.


clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
https://reviews.apache.org/r/26885/#comment98242

It's probably better to put them in two lines.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
https://reviews.apache.org/r/26885/#comment98234

It seems that expired can just be timeLeftMs = 0?



clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
https://reviews.apache.org/r/26885/#comment98236

Are these comments redundant now given the new comments above?



clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
https://reviews.apache.org/r/26885/#comment98388

It seems that in this case, the nextReadyCheckDelayMs should be the 
remaining linger time for tp1, which is lingerMs/2. Should we just assert that?


- Jun Rao


On Oct. 21, 2014, 12:34 a.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26885/
 ---
 
 (Updated Oct. 21, 2014, 12:34 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1642
 https://issues.apache.org/jira/browse/KAFKA-1642
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Fixes two issues with the computation of ready nodes and poll timeouts in
 Sender/RecordAccumulator:
 
 1. The timeout was computed incorrectly because it took into account all 
 nodes,
 even if they had data to send such that their timeout would be 0. However, 
 nodes
 were then filtered based on whether it was possible to send (i.e. their
 connection was still good) which could result in nothing to send and a 0
 timeout, resulting in busy looping. Instead, the timeout needs to be computed
 only using data that cannot be immediately sent, i.e. where the timeout will 
 be
 greater than 0. This timeout is only used if, after filtering by whether
 connections are ready for sending, there is no data to be sent. Other events 
 can
 wake the thread up earlier, e.g. a client reconnects and becomes ready again.
 
 2. One of the conditions indicating whether data is sendable is whether a
 timeout has expired -- either the linger time or the retry backoff. This
 condition wasn't accounting for both cases properly, always using the linger
 time. This means the retry backoff was probably not being respected.
 
 KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
 none can send data because they are in a connection backoff period.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 d304660f29246e9600efe3ddb28cfcc2b074bed3 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 29658d4a15f112dc0af5ce517eaab93e6f00134b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  c5d470011d334318d5ee801021aadd0c000974a6 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 aae8d4a1e98279470587d397cc779a9baf6fee6c 
   
 clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
  0762b35abba0551f23047348c5893bb8c9acff14 
 
 Diff: https://reviews.apache.org/r/26885/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




Re: Review Request 26885: Patch for KAFKA-1642

2014-10-23 Thread Ewen Cheslack-Postava


 On Oct. 23, 2014, 9:43 p.m., Jun Rao wrote:
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java,
   lines 197-199
  https://reviews.apache.org/r/26885/diff/2/?file=726776#file726776line197
 
  It seems that in this case, the nextReadyCheckDelayMs should be the 
  remaining linger time for tp1, which is lingerMs/2. Should we just assert 
  that?

tp1 and tp2 have the same leader, node1. The test adds enough data to make tp2 
sendable, so in the ideal case only tp3 would be used to determine timeout, 
which should require lingerMs more time. However, the test checks for = 
lingerMs because single scan through the topic partitions means that we can 
incorporate the lingerMs/2 timeout from tp1 even though we determine later that 
we really want to ignore it (and I actually saw this happen when I initially 
wrote the test to check for the exact value). I think the tradeoff of sometimes 
waking up a bit earlier than needed is probably worthwhile since it keeps the 
implementation simpler and cheaper.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/#review57513
---


On Oct. 21, 2014, 12:34 a.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26885/
 ---
 
 (Updated Oct. 21, 2014, 12:34 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1642
 https://issues.apache.org/jira/browse/KAFKA-1642
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Fixes two issues with the computation of ready nodes and poll timeouts in
 Sender/RecordAccumulator:
 
 1. The timeout was computed incorrectly because it took into account all 
 nodes,
 even if they had data to send such that their timeout would be 0. However, 
 nodes
 were then filtered based on whether it was possible to send (i.e. their
 connection was still good) which could result in nothing to send and a 0
 timeout, resulting in busy looping. Instead, the timeout needs to be computed
 only using data that cannot be immediately sent, i.e. where the timeout will 
 be
 greater than 0. This timeout is only used if, after filtering by whether
 connections are ready for sending, there is no data to be sent. Other events 
 can
 wake the thread up earlier, e.g. a client reconnects and becomes ready again.
 
 2. One of the conditions indicating whether data is sendable is whether a
 timeout has expired -- either the linger time or the retry backoff. This
 condition wasn't accounting for both cases properly, always using the linger
 time. This means the retry backoff was probably not being respected.
 
 KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
 none can send data because they are in a connection backoff period.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 d304660f29246e9600efe3ddb28cfcc2b074bed3 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 29658d4a15f112dc0af5ce517eaab93e6f00134b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  c5d470011d334318d5ee801021aadd0c000974a6 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 aae8d4a1e98279470587d397cc779a9baf6fee6c 
   
 clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
  0762b35abba0551f23047348c5893bb8c9acff14 
 
 Diff: https://reviews.apache.org/r/26885/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




Re: Review Request 26885: Patch for KAFKA-1642

2014-10-23 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/
---

(Updated Oct. 23, 2014, 11:19 p.m.)


Review request for kafka.


Bugs: KAFKA-1642
https://issues.apache.org/jira/browse/KAFKA-1642


Repository: kafka


Description (updated)
---

Fixes two issues with the computation of ready nodes and poll timeouts in
Sender/RecordAccumulator:

1. The timeout was computed incorrectly because it took into account all nodes,
even if they had data to send such that their timeout would be 0. However, nodes
were then filtered based on whether it was possible to send (i.e. their
connection was still good) which could result in nothing to send and a 0
timeout, resulting in busy looping. Instead, the timeout needs to be computed
only using data that cannot be immediately sent, i.e. where the timeout will be
greater than 0. This timeout is only used if, after filtering by whether
connections are ready for sending, there is no data to be sent. Other events can
wake the thread up earlier, e.g. a client reconnects and becomes ready again.

2. One of the conditions indicating whether data is sendable is whether a
timeout has expired -- either the linger time or the retry backoff. This
condition wasn't accounting for both cases properly, always using the linger
time. This means the retry backoff was probably not being respected.

KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
none can send data because they are in a connection backoff period.


Addressing Jun's comments.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
d304660f29246e9600efe3ddb28cfcc2b074bed3 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
29658d4a15f112dc0af5ce517eaab93e6f00134b 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 c5d470011d334318d5ee801021aadd0c000974a6 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
aae8d4a1e98279470587d397cc779a9baf6fee6c 
  
clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
 0762b35abba0551f23047348c5893bb8c9acff14 

Diff: https://reviews.apache.org/r/26885/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



Re: Review Request 26885: Patch for KAFKA-1642

2014-10-20 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/
---

(Updated Oct. 21, 2014, 12:34 a.m.)


Review request for kafka.


Bugs: KAFKA-1642
https://issues.apache.org/jira/browse/KAFKA-1642


Repository: kafka


Description (updated)
---

Fixes two issues with the computation of ready nodes and poll timeouts in
Sender/RecordAccumulator:

1. The timeout was computed incorrectly because it took into account all nodes,
even if they had data to send such that their timeout would be 0. However, nodes
were then filtered based on whether it was possible to send (i.e. their
connection was still good) which could result in nothing to send and a 0
timeout, resulting in busy looping. Instead, the timeout needs to be computed
only using data that cannot be immediately sent, i.e. where the timeout will be
greater than 0. This timeout is only used if, after filtering by whether
connections are ready for sending, there is no data to be sent. Other events can
wake the thread up earlier, e.g. a client reconnects and becomes ready again.

2. One of the conditions indicating whether data is sendable is whether a
timeout has expired -- either the linger time or the retry backoff. This
condition wasn't accounting for both cases properly, always using the linger
time. This means the retry backoff was probably not being respected.

KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
none can send data because they are in a connection backoff period.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
d304660f29246e9600efe3ddb28cfcc2b074bed3 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
29658d4a15f112dc0af5ce517eaab93e6f00134b 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 c5d470011d334318d5ee801021aadd0c000974a6 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java 
aae8d4a1e98279470587d397cc779a9baf6fee6c 
  
clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
 0762b35abba0551f23047348c5893bb8c9acff14 

Diff: https://reviews.apache.org/r/26885/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



Review Request 26885: Patch for KAFKA-1642

2014-10-17 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/
---

Review request for kafka.


Bugs: KAFKA-1642
https://issues.apache.org/jira/browse/KAFKA-1642


Repository: kafka


Description
---

Fixes two issues with computation of poll timeouts in Sender/RecordAccumulator. 
First, the timeout
was being computed by RecordAccumulator as it looked up which nodes had data to 
send, but the timeout
cannot be computed until after nodes that aren't ready for sending are filtered 
since this could
result in a node that is currently unreachable always returning a timeout of 0 
and triggering a busy
loop. The fixed version computes per-node timeouts and only computes the final 
timeout after nodes
that aren't ready for sending are removed.

Second, timeouts were only being computed based on the first TopicAndPartition 
encountered for each
node. This could result in incorrect timeouts if the first encountered didn't 
have the minimum
timeout for that node. This now evaluates every TopicAndPartition with a known 
leader and takes the
minimum.


Diffs
-

  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 c5d470011d334318d5ee801021aadd0c000974a6 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
  
clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
 0762b35abba0551f23047348c5893bb8c9acff14 

Diff: https://reviews.apache.org/r/26885/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava