Re: Review Request 26885: Patch for KAFKA-1642
On Oct. 27, 2014, 12:13 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 122 https://reviews.apache.org/r/26885/diff/3/?file=731636#file731636line122 The comments When connecting or connected, this handles slow/stalled connections here are a bit misleading: after checking the code I realize connectionDelay is only triggered to detemine the delay in milis that we can re-check connectivity for node that is not connected, and hence if the node is connected again while we are determining its delay, we just set it to MAX. Instead of making it general to the KafkaClient interface, shall we just add this to the code block of line 155? It gets triggered any time NetworkClient.ready returns false for a node. The obvious case is that it will return not ready when disconnected, but it also does so when connecting or when connected but inFlightRequests.canSendMore() returns false (thus the mention of slow/stalled connections. The important thing is that the value returned *is* MAX_VALUE in those latter cases because neither one will be resolved by polling -- they both require an external event (connection established/failed or outstanding request receives a response) which should wake up the event loop when there's something to do. That keeps us from polling unnecessarily. Previously there were conditions in which connections in these states could trigger busy waiting of the poll loop. I don't think we can get the same effect just inlining the code because it uses state that's only available through ClusterConnectionStates, which is private to NetworkClient. The KafkaClient only exposes the higher level concept of ready. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/#review58575 --- On Oct. 23, 2014, 11:19 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 23, 2014, 11:19 p.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Addressing Jun's comments. Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26885: Patch for KAFKA-1642
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/#review58575 --- Ship it! LGTM, with one minor comment below. clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://reviews.apache.org/r/26885/#comment99633 The comments When connecting or connected, this handles slow/stalled connections here are a bit misleading: after checking the code I realize connectionDelay is only triggered to detemine the delay in milis that we can re-check connectivity for node that is not connected, and hence if the node is connected again while we are determining its delay, we just set it to MAX. Instead of making it general to the KafkaClient interface, shall we just add this to the code block of line 155? - Guozhang Wang On Oct. 23, 2014, 11:19 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 23, 2014, 11:19 p.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Addressing Jun's comments. Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26885: Patch for KAFKA-1642
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/#review57513 --- Thanks for the patch. Looks good to me. Some minor comments below. clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java https://reviews.apache.org/r/26885/#comment98242 It's probably better to put them in two lines. clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java https://reviews.apache.org/r/26885/#comment98234 It seems that expired can just be timeLeftMs = 0? clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java https://reviews.apache.org/r/26885/#comment98236 Are these comments redundant now given the new comments above? clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java https://reviews.apache.org/r/26885/#comment98388 It seems that in this case, the nextReadyCheckDelayMs should be the remaining linger time for tp1, which is lingerMs/2. Should we just assert that? - Jun Rao On Oct. 21, 2014, 12:34 a.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 21, 2014, 12:34 a.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26885: Patch for KAFKA-1642
On Oct. 23, 2014, 9:43 p.m., Jun Rao wrote: clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java, lines 197-199 https://reviews.apache.org/r/26885/diff/2/?file=726776#file726776line197 It seems that in this case, the nextReadyCheckDelayMs should be the remaining linger time for tp1, which is lingerMs/2. Should we just assert that? tp1 and tp2 have the same leader, node1. The test adds enough data to make tp2 sendable, so in the ideal case only tp3 would be used to determine timeout, which should require lingerMs more time. However, the test checks for = lingerMs because single scan through the topic partitions means that we can incorporate the lingerMs/2 timeout from tp1 even though we determine later that we really want to ignore it (and I actually saw this happen when I initially wrote the test to check for the exact value). I think the tradeoff of sometimes waking up a bit earlier than needed is probably worthwhile since it keeps the implementation simpler and cheaper. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/#review57513 --- On Oct. 21, 2014, 12:34 a.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 21, 2014, 12:34 a.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26885: Patch for KAFKA-1642
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 23, 2014, 11:19 p.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description (updated) --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Addressing Jun's comments. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26885: Patch for KAFKA-1642
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 21, 2014, 12:34 a.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description (updated) --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Review Request 26885: Patch for KAFKA-1642
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description --- Fixes two issues with computation of poll timeouts in Sender/RecordAccumulator. First, the timeout was being computed by RecordAccumulator as it looked up which nodes had data to send, but the timeout cannot be computed until after nodes that aren't ready for sending are filtered since this could result in a node that is currently unreachable always returning a timeout of 0 and triggering a busy loop. The fixed version computes per-node timeouts and only computes the final timeout after nodes that aren't ready for sending are removed. Second, timeouts were only being computed based on the first TopicAndPartition encountered for each node. This could result in incorrect timeouts if the first encountered didn't have the minimum timeout for that node. This now evaluates every TopicAndPartition with a known leader and takes the minimum. Diffs - clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava