Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 30, 2015, 5:55 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description (updated) --- KAFKA-2168; minor fixes Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 9be8fbc648369ad9db1a7eea94bc1b9edbfdbfd7 core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala f56096b826bdbf760411a54ba067a6a83eca8a10 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 30, 2015, 10:24 p.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 933-937 https://reviews.apache.org/r/34789/diff/14/?file=996120#file996120line933 Wondering why we want to fetch for all assigned partitions if the requested partitions is indeed assigned here? I think this is how the old code was written, but I was wondering the same thing. I thought perhaps it was an optimization: since we have to hit the coordinator anyway, we may as well update all our assigned offsets in case they are stale as well. On June 30, 2015, 10:24 p.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 1151-1153 https://reviews.apache.org/r/34789/diff/14/?file=996120#file996120line1151 This comments seems out-dated, or otherwise a bit confusing. I think the comment is still accurate. It's basically just saying that we only fetch if the cache is dirty (as indicated by subscriptions.refreshCommitsNeeded()). I'll try to make it clearer. On June 30, 2015, 10:24 p.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 1249-1256 https://reviews.apache.org/r/34789/diff/14/?file=996120#file996120line1249 If the coordinator is not available, would this async commit also be blocked here? Or will this be resolved after we introduced the delayed queue in KAFKA-2123? Yes, that's right, and note that this is consistent with the behavior before this patch (just now it's a bit more explicit). The major problem is that we don't have any data structure to keep track of needed requests, so if the coordinator is not available and we don't want to block to wait for it, then all we can do is discard the commit. I think the delayed queued may be what we need to solve this problem since we can just reschedule the commit for a later time if the coordinator is not available. At the same time, I feel a little wary about having a backlog of commits that pile up when the coordinator is down. I'd almost rather just fail the commit and let it be retried on the next interval, but that has disadvantages as well. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review89971 --- On June 30, 2015, 5:55 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 30, 2015, 5:55 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; minor fixes Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 9be8fbc648369ad9db1a7eea94bc1b9edbfdbfd7 core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala f56096b826bdbf760411a54ba067a6a83eca8a10 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review89971 --- clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (lines 933 - 937) https://reviews.apache.org/r/34789/#comment142876 Wondering why we want to fetch for all assigned partitions if the requested partitions is indeed assigned here? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (lines 1149 - 1151) https://reviews.apache.org/r/34789/#comment142873 This comments seems out-dated, or otherwise a bit confusing. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (lines 1247 - 1254) https://reviews.apache.org/r/34789/#comment142874 If the coordinator is not available, would this async commit also be blocked here? Or will this be resolved after we introduced the delayed queue in KAFKA-2123? One minor comments that are out of the scope of this follow-up patch: in Coordinator.handleOffsetResponse, could we rename to handleFetchOffsetResponse? - Guozhang Wang On June 30, 2015, 5:55 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 30, 2015, 5:55 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; minor fixes Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 9be8fbc648369ad9db1a7eea94bc1b9edbfdbfd7 core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala f56096b826bdbf760411a54ba067a6a83eca8a10 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 23, 2015, 4:20 a.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, line 355 https://reviews.apache.org/r/34789/diff/12/?file=990006#file990006line355 What's the logic to initiate connection to coordinator if the coordinator is not available during HB? As it's currently written, we'd skip a heartbeat if we don't have an active connection to the coordinator. As long as the heartbeat frequency is 3 or more times per session timeout, this is probably ok, but we might want to handle it better if we end up exposing the heartbeat frequency in configuration (currently it's hard-coded). Perhaps we can fix this in a separate ticket? On June 23, 2015, 4:20 a.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 351-354 https://reviews.apache.org/r/34789/diff/12/?file=990003#file990003line351 These seem redundant give the code below. I think it's still necessary to call wakeup to abort a long poll if you want to ensure timely shutdown. You could probably get away without the closed flag and just use the ConsumerWakeupException to close the consumer, but the explicit flag seems cleaner. On June 23, 2015, 4:20 a.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 436 https://reviews.apache.org/r/34789/diff/12/?file=990003#file990003line436 Should this be volatile so that different threads can see the latest value of refcount? I think you are right. Fixed in latest patch. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review88914 --- On June 23, 2015, 4:39 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 23, 2015, 4:39 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; make refcount in KafkaConsumer an AtomicInteger Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 9be8fbc648369ad9db1a7eea94bc1b9edbfdbfd7 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 23, 2015, 4:39 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description (updated) --- KAFKA-2168; make refcount in KafkaConsumer an AtomicInteger Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 9be8fbc648369ad9db1a7eea94bc1b9edbfdbfd7 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review88796 --- Ship it! I did not review it thoroughly but the design looks clean to me. Great work! I think we can check it in to unblock other JIRAs, and come back to it when necessary in the future for any follow-up work. clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java (lines 15 - 18) https://reviews.apache.org/r/34789/#comment141379 We would like to have a serialVersionUID for any classes extending Serializable. You can take a look at ConfigException for example. - Guozhang Wang On June 19, 2015, 4:19 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 19, 2015, 4:19 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments KAFKA-2168; address more review comments KAFKA-2168; updated for review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 951c34c92710fc4b38d656e99d2a41255c60aeb7 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 41cb9458f51875ac9418fce52f264b35adba92f4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/main/java/org/apache/kafka/common/utils/Utils.java f73eedb030987f018d8446bb1dcd98d19fa97331 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 1454ab73df22cce028f41f74b970628829da4e9d clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 22, 2015, 11:35 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description (updated) --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments KAFKA-2168; address more review comments KAFKA-2168; updated for review comments KAFKA-2168; add serialVersionUID to ConsumerWakeupException Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 951c34c92710fc4b38d656e99d2a41255c60aeb7 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 41cb9458f51875ac9418fce52f264b35adba92f4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/main/java/org/apache/kafka/common/utils/Utils.java f73eedb030987f018d8446bb1dcd98d19fa97331 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 1454ab73df22cce028f41f74b970628829da4e9d clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review88914 --- Ship it! Thanks for the latest patch. Looks good overall. To avoid holding to this relative large patch for too long, I am committed the latest patch to trunk. There are a few minor comments below and we can commit any necessary fix in a follow up patch. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (lines 335 - 338) https://reviews.apache.org/r/34789/#comment141530 These seem redundant give the code below. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 420) https://reviews.apache.org/r/34789/#comment141531 Should this be volatile so that different threads can see the latest value of refcount? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 319) https://reviews.apache.org/r/34789/#comment141529 What's the logic to initiate connection to coordinator if the coordinator is not available during HB? - Jun Rao On June 22, 2015, 11:35 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 22, 2015, 11:35 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments KAFKA-2168; address more review comments KAFKA-2168; updated for review comments KAFKA-2168; add serialVersionUID to ConsumerWakeupException Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 951c34c92710fc4b38d656e99d2a41255c60aeb7 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 41cb9458f51875ac9418fce52f264b35adba92f4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/main/java/org/apache/kafka/common/utils/Utils.java f73eedb030987f018d8446bb1dcd98d19fa97331 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 1454ab73df22cce028f41f74b970628829da4e9d clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 19, 2015, 4:19 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description (updated) --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments KAFKA-2168; address more review comments KAFKA-2168; updated for review comments Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 951c34c92710fc4b38d656e99d2a41255c60aeb7 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 41cb9458f51875ac9418fce52f264b35adba92f4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/main/java/org/apache/kafka/common/utils/Utils.java f73eedb030987f018d8446bb1dcd98d19fa97331 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 1454ab73df22cce028f41f74b970628829da4e9d clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 18, 2015, 9:59 p.m., Jay Kreps wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1364 https://reviews.apache.org/r/34789/diff/9-10/?file=983137#file983137line1364 This seems like one of these things that is clever but invariably ends up not quite working. Did we actually determine there is a performance hit from just synchronizing. Biased locking and lock elision should make this very cheap right? Given we acquire locks for all the perf stats and there are several stat updates for each poll call I don't see the problem with just synchronizing. Ewen Cheslack-Postava wrote: Adding synchronization has at least one other downside besides performance - you can pretty easily end up deadlocking due to callbacks (consumer rebalance, commit) if you also have other synchronization outside the consumer itself. And if someone has a chance of accessing the consumer from multiple threads, that probably does mean they have some other synchronization (or should). Deadlocks are generally easier to diagnose than synchronization bugs, but you're introducing potential issues either way. Based on previous conversation, I think this addresses the main concern with the unsynchronized version and actually results in a lot *less* unintuitive behavior than the synchronized version. If I'm already unaware of the fact that I am incorrectly calling the consumer from multiple threads, using the synchronized version is just going to make me think commits/seeks/etc are running really slow as I wait for poll() calls to return, and then blame the library because as far as I know I'm using it correctly and it's just not behaving well. This solution seems a lot better because although it is not guaranteed to catch conflicts, you're a lot more likely to hit them, they get turned into actual exceptions, and the cause is made very clear to you -- it specifically puts the blame on the code calling these methods and indicates that the caller has a bug in their code since they have not properly synchronized access to the consumer. A real race detector is more likely to pick up these errors, but this is a pretty good way to have a fair chance of catching the error and informing the user, especially since the most likely error is calling a method while a poll() is running, and those should generally have pretty long timeouts. Having to litter the code with try/finally blocks is a definite drawback, but just encourages us to keep the API as small as possible :) As Ewen said, I think the main advantage is that it forces users into the single-threaded access model that we want them to use. However, if we think this implementation is a bit too clever (and if we aren't overly concerned about the potential for deadlock in the callback), then we could use a traditional ReentrantLock and get the same behavior by using tryLock(). I doubt the performance impact would be significant. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review88441 --- On June 18, 2015, 9:40 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 18, 2015, 9:40 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments KAFKA-2168; address more review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 951c34c92710fc4b38d656e99d2a41255c60aeb7 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 18, 2015, 9:40 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description (updated) --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments KAFKA-2168; address more review comments Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 951c34c92710fc4b38d656e99d2a41255c60aeb7 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 41cb9458f51875ac9418fce52f264b35adba92f4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/main/java/org/apache/kafka/common/utils/Utils.java f73eedb030987f018d8446bb1dcd98d19fa97331 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 1454ab73df22cce028f41f74b970628829da4e9d clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review88441 --- clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 1357) https://reviews.apache.org/r/34789/#comment140961 This seems like one of these things that is clever but invariably ends up not quite working. Did we actually determine there is a performance hit from just synchronizing. Biased locking and lock elision should make this very cheap right? Given we acquire locks for all the perf stats and there are several stat updates for each poll call I don't see the problem with just synchronizing. - Jay Kreps On June 18, 2015, 9:40 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 18, 2015, 9:40 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments KAFKA-2168; address more review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 951c34c92710fc4b38d656e99d2a41255c60aeb7 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 41cb9458f51875ac9418fce52f264b35adba92f4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/main/java/org/apache/kafka/common/utils/Utils.java f73eedb030987f018d8446bb1dcd98d19fa97331 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 1454ab73df22cce028f41f74b970628829da4e9d clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 18, 2015, 9:59 p.m., Jay Kreps wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1364 https://reviews.apache.org/r/34789/diff/9-10/?file=983137#file983137line1364 This seems like one of these things that is clever but invariably ends up not quite working. Did we actually determine there is a performance hit from just synchronizing. Biased locking and lock elision should make this very cheap right? Given we acquire locks for all the perf stats and there are several stat updates for each poll call I don't see the problem with just synchronizing. Adding synchronization has at least one other downside besides performance - you can pretty easily end up deadlocking due to callbacks (consumer rebalance, commit) if you also have other synchronization outside the consumer itself. And if someone has a chance of accessing the consumer from multiple threads, that probably does mean they have some other synchronization (or should). Deadlocks are generally easier to diagnose than synchronization bugs, but you're introducing potential issues either way. Based on previous conversation, I think this addresses the main concern with the unsynchronized version and actually results in a lot *less* unintuitive behavior than the synchronized version. If I'm already unaware of the fact that I am incorrectly calling the consumer from multiple threads, using the synchronized version is just going to make me think commits/seeks/etc are running really slow as I wait for poll() calls to return, and then blame the library because as far as I know I'm using it correctly and it's just not behaving well. This solution seems a lot better because although it is not guaranteed to catch conflicts, you're a lot more likely to hit them, they get turned into actual exceptions, and the cause is made very clear to you -- it specifically puts the blame on the code calling these methods and indicates that the caller has a bug in their code since they have not properly synchronized access to the consumer. A real race detector is more likely to pick up these errors, but this is a pretty good way to have a fair chance of catching the error and informing the user, especially since the most likely error is calling a method while a poll() is running, and those should generally have pretty long timeouts. Having to litter the code with try/finally blocks is a definite drawback, but just encourages us to keep the API as small as possible :) - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review88441 --- On June 18, 2015, 9:40 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 18, 2015, 9:40 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments KAFKA-2168; address more review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 951c34c92710fc4b38d656e99d2a41255c60aeb7 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 41cb9458f51875ac9418fce52f264b35adba92f4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review88451 --- clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 725) https://reviews.apache.org/r/34789/#comment140985 Might help readers to explain that this pipelines the next request with the user's processing of the current set of records clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 1344) https://reviews.apache.org/r/34789/#comment140986 Used only once? This looks like it should either be done inline, or is a generic utility that should end up in Utils instead. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 1372) https://reviews.apache.org/r/34789/#comment140984 This doesn't handle nested calls properly, e.g. poll() - callback - seek/commit/etc. You may need currentThread and a refcount so you only clear currentThread when the refcount hits 0. - Ewen Cheslack-Postava On June 18, 2015, 9:40 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 18, 2015, 9:40 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments KAFKA-2168; address more review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 951c34c92710fc4b38d656e99d2a41255c60aeb7 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 41cb9458f51875ac9418fce52f264b35adba92f4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/main/java/org/apache/kafka/common/utils/Utils.java f73eedb030987f018d8446bb1dcd98d19fa97331 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 1454ab73df22cce028f41f74b970628829da4e9d clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 18, 2015, 11:13 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1379 https://reviews.apache.org/r/34789/diff/9-10/?file=983137#file983137line1379 This doesn't handle nested calls properly, e.g. poll() - callback - seek/commit/etc. You may need currentThread and a refcount so you only clear currentThread when the refcount hits 0. You're totally right (blush). Do you think it's worth pulling this functionality into a separate class? - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review88451 --- On June 18, 2015, 9:40 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 18, 2015, 9:40 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments KAFKA-2168; address more review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 951c34c92710fc4b38d656e99d2a41255c60aeb7 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 41cb9458f51875ac9418fce52f264b35adba92f4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/main/java/org/apache/kafka/common/utils/Utils.java f73eedb030987f018d8446bb1dcd98d19fa97331 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 1454ab73df22cce028f41f74b970628829da4e9d clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 18, 2015, 11:13 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1379 https://reviews.apache.org/r/34789/diff/9-10/?file=983137#file983137line1379 This doesn't handle nested calls properly, e.g. poll() - callback - seek/commit/etc. You may need currentThread and a refcount so you only clear currentThread when the refcount hits 0. Jason Gustafson wrote: You're totally right (blush). Do you think it's worth pulling this functionality into a separate class? I probably wouldn't until we have a reason to reuse it. It's only 2 fields and 2 methods. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review88451 --- On June 18, 2015, 9:40 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 18, 2015, 9:40 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments KAFKA-2168; address more review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 951c34c92710fc4b38d656e99d2a41255c60aeb7 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 41cb9458f51875ac9418fce52f264b35adba92f4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/main/java/org/apache/kafka/common/utils/Utils.java f73eedb030987f018d8446bb1dcd98d19fa97331 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java 1454ab73df22cce028f41f74b970628829da4e9d clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 15, 2015, 6:09 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1199 https://reviews.apache.org/r/34789/diff/9/?file=983137#file983137line1199 Doesn't this cause poll() to block for the backoff time? Generally it wouldn't, but there are a couple cases where it could. For example, we could be waiting to discover the coordinator, or we might need to fetch some partition offsets. In either case, we could block longer than the poll timeout. Note that this is consistent with the old version. I had hoped to address it as part of solving KAFKA-1894. On June 15, 2015, 6:09 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 928 https://reviews.apache.org/r/34789/diff/9/?file=983137#file983137line928 Could you explain a bit more how wakeup() can be used for commit offsets or seek to a new position? Does a separate thread have to first save some partitions for committing offsets in the poll thread and then wake it up? Then the poll thread will initiate the offset commit before the next poll request? It's admittedly a little awkward, but you could do it the way you suggest. For example, you could use a flag and check it in the poll thread: ``` while (true) { if (needCommit.get()) { consumer.commit(offsets) needCommit.set(false) } records = consumer.poll(5) // submit records to executor perhaps } ``` Then the other thread just needs to set the flag and offsets that need committing and wakeup the consumer. Alternatively, you could protect the consumer with a lock and commit the offsets from a separate thread. The difficulty with this solution is that you need to make sure that the polling thread does not simply reacquire the lock after being woken up. This problem can be dealt with by introducing a second lock or perhaps by getting tricky with a semaphore, but I'd probably recommend the previous solution instead since it's easier to understand. On June 15, 2015, 6:09 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 686-687 https://reviews.apache.org/r/34789/diff/9/?file=983137#file983137line686 It's not very clear to me if the caller is woken up from a poll() call, whether the next poll() call will resume from the exact same state the previous one is left off. For example, let's say the first pool() call just issued a consumerMetadataRequest in ensureCoordinatorKnown() and then is woken up. In the next poll() call, do we just wait for the response from the previous consumerMetadataRequest or will it trigger a new consumerMetadataRequest? If it's the latter, will the two responses be processed out of order? Waking up the consumer can leave requests pending. This could also happen if we change the code to actually respect the poll timeout (and I think this is the main reason why it hasn't been done). I was thinking about wakeup() as primarily a way to get timely shutdown behavior, but if it is also used for commits or seeks, then we have to solve this problem. I have thought about adding a cancel() operation to the RequestFuture. It wouldn't stop an in-flight request, but you could prevent its callback from being executed. This would probably be good enough to deal with a pending consumerMetadataRequest, but we wouldn't want to leave a JoinGroup request pending since resending it may cause unnecessary rebalancing. Perhaps we shouldn't allow wakeups in those cases? Maybe the only case we actually wakeup from is when we are polling for data? - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87880 --- On June 11, 2015, 9:10 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 11, 2015, 9:10 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87880 --- Thanks for the new patch. A few more quick comments. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment140366 It's not very clear to me if the caller is woken up from a poll() call, whether the next poll() call will resume from the exact same state the previous one is left off. For example, let's say the first pool() call just issued a consumerMetadataRequest in ensureCoordinatorKnown() and then is woken up. In the next poll() call, do we just wait for the response from the previous consumerMetadataRequest or will it trigger a new consumerMetadataRequest? If it's the latter, will the two responses be processed out of order? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment140293 typo: this do clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment140353 Could you explain a bit more how wakeup() can be used for commit offsets or seek to a new position? Does a separate thread have to first save some partitions for committing offsets in the poll thread and then wake it up? Then the poll thread will initiate the offset commit before the next poll request? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment140346 Doesn't this cause poll() to block for the backoff time? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment140345 It seems that the expectation is that if the request times out, any callbacks registered with the previously submitted requests will no longer be called. Perhaps we can add this in the TODO comment. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java https://reviews.apache.org/r/34789/#comment140349 To be consistent with the naming of the request, perhaps it's better to name this listOffset and the next one handleListOffsetResponse. clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java https://reviews.apache.org/r/34789/#comment140344 Incomplete comment after but a. clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java https://reviews.apache.org/r/34789/#comment140347 This seems never used. Is there a use case for this? - Jun Rao On June 11, 2015, 9:10 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 11, 2015, 9:10 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
Re: Review Request 34789: Patch for KAFKA-2168
On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 797-798 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797 Hmm, seekToBegining() is supposed to be a blocking call. Basically, at the end of the call, we expect the fetch offset to be set to the beginning. This is now changed to async, which doesn't match the intended behavior. We need to think through if this matters or not. Ditto for seekToEnd(). Jason Gustafson wrote: Since we always update fetch positions before a new fetch and in position(), it didn't seem necessary to make it synchronous. I thought this handling might be more consistent with how new subscriptions are handled (which are asynchronous and defer the initial offset fetch until the next poll or position). That being said, I don't have a strong feeling about it, so we could return to the blocking version. Jun Rao wrote: Making this async may be fine. One implication is that we call position() immediately after seekToBeginning(), we may not be able to get the correct offset. Jason Gustafson wrote: We should be able to get the right offset since we always update offsets before returning the current position, but we might have to block for it. It's similar to if you call subscribe(topic) and then try to get its position immediately. Jun Rao wrote: That may work. However, if one calls seekToBegining() followed by seekToEnd(), will we guarantee that position() returns the end offset? Yes, this will work. The latest seek will overwrite any pending ones. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 319-322 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line319 Could we add an example of how to use the new wakeup() call, especially with closing the consumer properly? For example, does the consumer thread just catch the ConsumerWakeupException and then call close()? I've added an example in the latest patch. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 1039-1040 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1039 The returned response may be ready already after the offsetBefore call due to needing metadata refresh. Since we don't check the ready state immediately afterward, we may be delaying the processing of metadata refresh by the request timeout. Jason Gustafson wrote: This is a pretty good point. One of the reasons working with NetworkClient is tricky is that you need several polls to complete a request: one to connect, one to send, and one to receive. In this case, the result might not be ready because we are in the middle of connecting to the broker, in which case we need to call poll() to finish the connect. If we don't, then then next request will just fail for the same reason. I'll look to see if there's a way to fix this to avoid unnecessary calls to poll. I struggled a bit trying to fix this. In the latest patch, I changed the notion of remedy to a retryAction and included polling as one of the possible actions. Then if the result is finished, we only would call poll if the result indicates that it's needed. The only case where I actually use this is when a connection has just been initiated. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87190 --- On June 11, 2015, 9:10 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 11, 2015, 9:10 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION
Re: Review Request 34789: Patch for KAFKA-2168
On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java, line 15 https://reviews.apache.org/r/34789/diff/3/?file=976967#file976967line15 The classes named XResponse may be a bit confusing because the protocol responses use that terminology. Future? Result? Jason Gustafson wrote: Agreed. In fact, they were XResult initially. I changed them because BrokerResult and CoordinatorResult didn't seems to suggest as clearly what they were for as BrokerResponse and CoordinatorResponse. I considered Future as well, but its usage is a bit different than traditional Java Futures. Perhaps XReply? Ewen Cheslack-Postava wrote: Even though there's no blocking get(), XFuture might be the clearest. XReply would work, but has a similar issue that it gets confusing whether XResponse or XReply is the actual message received back vs. the processed data that you wanted to extract. In the latest patch, I changed DelayedResult to RequestFuture. Think that's better? - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review86338 --- On June 11, 2015, 9:10 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 11, 2015, 9:10 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 11, 2015, 9:10 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description (updated) --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately KAFKA-2168; address review comments Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java, line 16 https://reviews.apache.org/r/34789/diff/8/?file=980077#file980077line16 Do we need NONE? Jason Gustafson wrote: It was there before, but I don't think it's actually used. I'd be fine removing it. You need it to properly parse the none config value and so there is an OffsetResetStrategy value to indicate that option. It's just not currently used because you handle it with an else rather than an else if. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java, line 196 https://reviews.apache.org/r/34789/diff/8/?file=980084#file980084line196 To be consistent with the naming convention with the rest of the methods, should we just name it offsetRestNeeded()? Jason Gustafson wrote: Haha, I actually used that convention initially, but it was a little confusing at times which method should be used. I can change it back, or we can add the is prefix to the other usages. Preferences? I think I've seen a number of other places where we're not exactly consistent with this (grep for [.]is for lots of examples). Naming conventions seem to be quite mixed between this type, get/set style, and just bare names. Not sure it's worth worrying about beyond readability issues. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87190 --- On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 5, 2015, 7:45 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 9, 2015, 6:49 p.m., Jay Kreps wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 322 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line322 Do we need this? There is no real guarantee on the poll time, so it seems that we could just return when wakeup is called. Jason Gustafson wrote: You might be waking up from a synchronous commit, for example. In that case, all we can do is raise an exception. We could alternatively say that wakeup only applies to the poll() method and cannot be used to interrupt the other calls. If poll just returns on wakeup, how would the caller know if there is an intention to close the consumer? - Jun --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87257 --- On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 5, 2015, 7:45 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1212 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212 -1 makes the pollClient block forever. So, we don't get a chance to do the wakeup check. Jason Gustafson wrote: I might be wrong, but I think we can still use NetworkClient.wakeup to interrupt a poll call which is waiting forever. Jun Rao wrote: Yes, you are right. I missed that. Actually, this seems to be still a problem. The issus is that when NetworkClient.wakeup is made, NetworkClient.poll may not be called. In the case, the next NetworkClient.poll may still block for the timeout. - Jun --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87190 --- On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 5, 2015, 7:45 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 9, 2015, 6:49 p.m., Jay Kreps wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 322 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line322 Do we need this? There is no real guarantee on the poll time, so it seems that we could just return when wakeup is called. Jason Gustafson wrote: You might be waking up from a synchronous commit, for example. In that case, all we can do is raise an exception. We could alternatively say that wakeup only applies to the poll() method and cannot be used to interrupt the other calls. Jun Rao wrote: If poll just returns on wakeup, how would the caller know if there is an intention to close the consumer? I think the user would have to use a separate flag to indicate their intention to close. Then their shutdown hook would first set the flag, then call wakeup(). The polling thread then might look like this: ``` KakfaConsumer consumer = new KafkaConsumer() try { consumer.subscribe(foo); while (!closed) { records = consumer.poll(5000) // Do something with the records } } finally { consumer.close(); } ``` - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87257 --- On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 5, 2015, 7:45 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1212 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212 -1 makes the pollClient block forever. So, we don't get a chance to do the wakeup check. Jason Gustafson wrote: I might be wrong, but I think we can still use NetworkClient.wakeup to interrupt a poll call which is waiting forever. Yes, you are right. I missed that. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 797-798 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797 Hmm, seekToBegining() is supposed to be a blocking call. Basically, at the end of the call, we expect the fetch offset to be set to the beginning. This is now changed to async, which doesn't match the intended behavior. We need to think through if this matters or not. Ditto for seekToEnd(). Jason Gustafson wrote: Since we always update fetch positions before a new fetch and in position(), it didn't seem necessary to make it synchronous. I thought this handling might be more consistent with how new subscriptions are handled (which are asynchronous and defer the initial offset fetch until the next poll or position). That being said, I don't have a strong feeling about it, so we could return to the blocking version. Making this async may be fine. One implication is that we call position() immediately after seekToBeginning(), we may not be able to get the correct offset. - Jun --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87190 --- On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 5, 2015, 7:45 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks,
Re: Review Request 34789: Patch for KAFKA-2168
On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1212 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212 -1 makes the pollClient block forever. So, we don't get a chance to do the wakeup check. Jason Gustafson wrote: I might be wrong, but I think we can still use NetworkClient.wakeup to interrupt a poll call which is waiting forever. Jun Rao wrote: Yes, you are right. I missed that. Jun Rao wrote: Actually, this seems to be still a problem. The issus is that when NetworkClient.wakeup is made, NetworkClient.poll may not be called. In the case, the next NetworkClient.poll may still block for the timeout. From the javadocs for Selector, the wakeup will apply to the next poll if one is not in progress. But perhaps we should just check the wakeup flag before entering the poll to be safe. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 797-798 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797 Hmm, seekToBegining() is supposed to be a blocking call. Basically, at the end of the call, we expect the fetch offset to be set to the beginning. This is now changed to async, which doesn't match the intended behavior. We need to think through if this matters or not. Ditto for seekToEnd(). Jason Gustafson wrote: Since we always update fetch positions before a new fetch and in position(), it didn't seem necessary to make it synchronous. I thought this handling might be more consistent with how new subscriptions are handled (which are asynchronous and defer the initial offset fetch until the next poll or position). That being said, I don't have a strong feeling about it, so we could return to the blocking version. Jun Rao wrote: Making this async may be fine. One implication is that we call position() immediately after seekToBeginning(), we may not be able to get the correct offset. We should be able to get the right offset since we always update offsets before returning the current position, but we might have to block for it. It's similar to if you call subscribe(topic) and then try to get its position immediately. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87190 --- On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 5, 2015, 7:45 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3
Re: Review Request 34789: Patch for KAFKA-2168
On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1212 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212 -1 makes the pollClient block forever. So, we don't get a chance to do the wakeup check. Jason Gustafson wrote: I might be wrong, but I think we can still use NetworkClient.wakeup to interrupt a poll call which is waiting forever. Jun Rao wrote: Yes, you are right. I missed that. Jun Rao wrote: Actually, this seems to be still a problem. The issus is that when NetworkClient.wakeup is made, NetworkClient.poll may not be called. In the case, the next NetworkClient.poll may still block for the timeout. Jason Gustafson wrote: From the javadocs for Selector, the wakeup will apply to the next poll if one is not in progress. But perhaps we should just check the wakeup flag before entering the poll to be safe. Yes, in that case, this is not an issue. We probaly don't have to check the wakeup flag before the poll call since the flag could change immediately after the check. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 797-798 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797 Hmm, seekToBegining() is supposed to be a blocking call. Basically, at the end of the call, we expect the fetch offset to be set to the beginning. This is now changed to async, which doesn't match the intended behavior. We need to think through if this matters or not. Ditto for seekToEnd(). Jason Gustafson wrote: Since we always update fetch positions before a new fetch and in position(), it didn't seem necessary to make it synchronous. I thought this handling might be more consistent with how new subscriptions are handled (which are asynchronous and defer the initial offset fetch until the next poll or position). That being said, I don't have a strong feeling about it, so we could return to the blocking version. Jun Rao wrote: Making this async may be fine. One implication is that we call position() immediately after seekToBeginning(), we may not be able to get the correct offset. Jason Gustafson wrote: We should be able to get the right offset since we always update offsets before returning the current position, but we might have to block for it. It's similar to if you call subscribe(topic) and then try to get its position immediately. That may work. However, if one calls seekToBegining() followed by seekToEnd(), will we guarantee that position() returns the end offset? - Jun --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87190 --- On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 5, 2015, 7:45 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87257 --- clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139570 Do we need this? There is no real guarantee on the poll time, so it seems that we could just return when wakeup is called. - Jay Kreps On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 5, 2015, 7:45 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 9, 2015, 6:49 p.m., Jay Kreps wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 322 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line322 Do we need this? There is no real guarantee on the poll time, so it seems that we could just return when wakeup is called. You might be waking up from a synchronous commit, for example. In that case, all we can do is raise an exception. We could alternatively say that wakeup only applies to the poll() method and cannot be used to interrupt the other calls. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87257 --- On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 5, 2015, 7:45 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87190 --- Thanks for the patch. A few comments below. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139565 Could we add an example of how to use the new wakeup() call, especially with closing the consumer properly? For example, does the consumer thread just catch the ConsumerWakeupException and then call close()? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139574 Hmm, seekToBegining() is supposed to be a blocking call. Basically, at the end of the call, we expect the fetch offset to be set to the beginning. This is now changed to async, which doesn't match the intended behavior. We need to think through if this matters or not. Ditto for seekToEnd(). clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139577 Should we pass in tp to isOffsetResetNeeded()? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139509 The returned response may be ready already after the offsetBefore call due to needing metadata refresh. Since we don't check the ready state immediately afterward, we may be delaying the processing of metadata refresh by the request timeout. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139564 Currently, our coding convention is not to wrap single line statement with {}. There are a few other cases like this. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139544 In the async mode, response may not be ready in the first iteration. Are we handling the retry properly in that case? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139545 We probably need to make some changes here when KAFKA-2120 is done to handle the request timeout propertly. Perhaps we can add a TODO comment here. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139563 -1 makes the pollClient block forever. So, we don't get a chance to do the wakeup check. clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java https://reviews.apache.org/r/34789/#comment139588 Do we need NONE? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java https://reviews.apache.org/r/34789/#comment139579 We probably can make it clear that this is a non-blocking call and doesn't wait for the request to be sent or the response to be received. It would be good to do that on other similar methods too. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java https://reviews.apache.org/r/34789/#comment139537 Should asynchronous by synchronous? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java https://reviews.apache.org/r/34789/#comment139586 Similar as the above, it would be useful to make it clear that fetchOffsets() is non-blocking. clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java https://reviews.apache.org/r/34789/#comment139534 Perhaps we can define a static method to initalize the constant and set the state. It's clearer that way since the instantiation and the initialization are in the same place. With this, we probably don't need the static getter methods and can just let the caller use the static constants directly. Ditto for BrokerResult. clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java https://reviews.apache.org/r/34789/#comment139540 The comment seems inaccurate. We are not returning an error, but returning a remedy instead. clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java https://reviews.apache.org/r/34789/#comment139504 Do we need both hasRemedy and hasException? It seems that if one returns true, the other should always return false. clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java https://reviews.apache.org/r/34789/#comment139578 To be consistent with the naming convention with the rest of the methods, should we just name it offsetRestNeeded()? - Jun Rao On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit:
Re: Review Request 34789: Patch for KAFKA-2168
On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 995 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line995 Should we pass in tp to isOffsetResetNeeded()? Yes, we should. I'll fix it. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 797-798 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797 Hmm, seekToBegining() is supposed to be a blocking call. Basically, at the end of the call, we expect the fetch offset to be set to the beginning. This is now changed to async, which doesn't match the intended behavior. We need to think through if this matters or not. Ditto for seekToEnd(). Since we always update fetch positions before a new fetch and in position(), it didn't seem necessary to make it synchronous. I thought this handling might be more consistent with how new subscriptions are handled (which are asynchronous and defer the initial offset fetch until the next poll or position). That being said, I don't have a strong feeling about it, so we could return to the blocking version. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 1039-1040 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1039 The returned response may be ready already after the offsetBefore call due to needing metadata refresh. Since we don't check the ready state immediately afterward, we may be delaying the processing of metadata refresh by the request timeout. This is a pretty good point. One of the reasons working with NetworkClient is tricky is that you need several polls to complete a request: one to connect, one to send, and one to receive. In this case, the result might not be ready because we are in the middle of connecting to the broker, in which case we need to call poll() to finish the connect. If we don't, then then next request will just fail for the same reason. I'll look to see if there's a way to fix this to avoid unnecessary calls to poll. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 1139-1141 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1139 In the async mode, response may not be ready in the first iteration. Are we handling the retry properly in that case? When an async commit request fails, we do not retry, which is consistent with the current consumer. I think Ewen's patch for KAFKA-2123 introduces a good approach to retrying async commits. My own preference is to let them fail fast as long as the user has the callback from KAFKA-2123 to handle their failure. Otherwise, it gets a little tricky trying to preserve their order. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1195 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1195 We probably need to make some changes here when KAFKA-2120 is done to handle the request timeout propertly. Perhaps we can add a TODO comment here. I'll add a note. This also falls in the purview of KAFKA-1894. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java, line 16 https://reviews.apache.org/r/34789/diff/8/?file=980077#file980077line16 Do we need NONE? It was there before, but I don't think it's actually used. I'd be fine removing it. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java, lines 20-27 https://reviews.apache.org/r/34789/diff/8/?file=980080#file980080line20 Perhaps we can define a static method to initalize the constant and set the state. It's clearer that way since the instantiation and the initialization are in the same place. With this, we probably don't need the static getter methods and can just let the caller use the static constants directly. Ditto for BrokerResult. Agreed. I'll fix it. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1212 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212 -1 makes the pollClient block forever. So, we don't get a chance to do the wakeup check. I might be wrong, but I think we can still use NetworkClient.wakeup to interrupt a poll call which is waiting forever. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 1078-1080 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1078
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 5, 2015, 7:02 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description (updated) --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 5, 2015, 7:45 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description (updated) --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 4, 2015, 9:36 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description (updated) --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 4, 2015, 6:21 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 671 https://reviews.apache.org/r/34789/diff/5/?file=978185#file978185line671 It looks like these were already issues before this code was rearranged. Are there JIRAs filed for them yet? I think they're covered by KAFKA-1894. On June 4, 2015, 6:21 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1110 https://reviews.apache.org/r/34789/diff/5/?file=978185#file978185line1110 This is definitely optional, but it might makes sense to just inline this to discourage anyone from using it anywhere else -- this is really the only place it should be needed. I've removed this since it's not actually needed. If the coordinator is not known, the request will fail immediately and ensureCoordinator will then be invoked. On June 4, 2015, 6:21 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 391 https://reviews.apache.org/r/34789/diff/5/?file=978185#file978185line391 If this is going to be left to be fixed with the patch for KIP-19, we should make sure that's noted in the associated JIRA. I've added a comment to KAFKA-2120. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review86651 --- On June 4, 2015, 9:36 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 4, 2015, 9:36 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1091 https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line1091 Since poll() can trigger auto offset commits, and then the commits can block while polling() for some time, can we end up recursing in some bad situations, e.g. if we consistently cannot get a coordinator? We might need to keep track if a commit is outstanding and not try to commit again, or just update the values we're trying to commit. To clarify the issue: in poll(timeout), we check if it is time to autocommit and call commit, which then calls commitOffsets. In commitOffsets, we have a while(true) loop and in it we poll for both sync and async. If that polling process takes long enough, then we could hit the next interval and those poll() calls could trigger another call to commit. Now we have 2 calls to commit on the stack. I don't think this is likely, and I'm not certain there's a condition where you can get stuck in the loop that long when using async commits. But since the logic in commitOffsets current checks response.isReady first, *then* breaks if its async, I thought it might be possible that during a connectivity issue with the coordinator, you might just get stuck in this loop even in async commit mode and trigger this recursive commit behavior (given a long enough outage/short enough auto commit interval). On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java, line 15 https://reviews.apache.org/r/34789/diff/3/?file=976967#file976967line15 The classes named XResponse may be a bit confusing because the protocol responses use that terminology. Future? Result? Jason Gustafson wrote: Agreed. In fact, they were XResult initially. I changed them because BrokerResult and CoordinatorResult didn't seems to suggest as clearly what they were for as BrokerResponse and CoordinatorResponse. I considered Future as well, but its usage is a bit different than traditional Java Futures. Perhaps XReply? Even though there's no blocking get(), XFuture might be the clearest. XReply would work, but has a similar issue that it gets confusing whether XResponse or XReply is the actual message received back vs. the processed data that you wanted to extract. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review86338 --- On June 4, 2015, 4:07 a.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 4, 2015, 4:07 a.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2186; fix rebase error and checkstyle issue Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java fac79951d50ef6f19cef5fe62cbc4582b27b145a clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java c5e577ff98bea3de65e290d30065935a29b3247f clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b
Re: Review Request 34789: Patch for KAFKA-2168
On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1091 https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line1091 Since poll() can trigger auto offset commits, and then the commits can block while polling() for some time, can we end up recursing in some bad situations, e.g. if we consistently cannot get a coordinator? We might need to keep track if a commit is outstanding and not try to commit again, or just update the values we're trying to commit. Ewen Cheslack-Postava wrote: To clarify the issue: in poll(timeout), we check if it is time to autocommit and call commit, which then calls commitOffsets. In commitOffsets, we have a while(true) loop and in it we poll for both sync and async. If that polling process takes long enough, then we could hit the next interval and those poll() calls could trigger another call to commit. Now we have 2 calls to commit on the stack. I don't think this is likely, and I'm not certain there's a condition where you can get stuck in the loop that long when using async commits. But since the logic in commitOffsets current checks response.isReady first, *then* breaks if its async, I thought it might be possible that during a connectivity issue with the coordinator, you might just get stuck in this loop even in async commit mode and trigger this recursive commit behavior (given a long enough outage/short enough auto commit interval). I think there might be some confusion caused by the the overloading of the poll method. The poll that is called in commitOffsets is basically just a wrapper around client.poll() and doesn't recurse. Perhaps I can rename these methods to clientPoll() to make this clearer? - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review86338 --- On June 4, 2015, 4:07 a.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 4, 2015, 4:07 a.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2186; fix rebase error and checkstyle issue Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java fac79951d50ef6f19cef5fe62cbc4582b27b145a clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java c5e577ff98bea3de65e290d30065935a29b3247f clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review86651 --- clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment138705 If this is going to be left to be fixed with the patch for KIP-19, we should make sure that's noted in the associated JIRA. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment138706 It looks like these were already issues before this code was rearranged. Are there JIRAs filed for them yet? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment138744 javadoc is out of data - strategy is no longer an argument here clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment138743 This is definitely optional, but it might makes sense to just inline this to discourage anyone from using it anywhere else -- this is really the only place it should be needed. LGTM once rebased against trunk, only had a few minor comments. The naming of the DelayedResult classes could be changed, but I think none of the options we have are entirely clear so I wouldn't worry about that naming too much. - Ewen Cheslack-Postava On June 4, 2015, 4:07 a.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 4, 2015, 4:07 a.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2186; fix rebase error and checkstyle issue Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java fac79951d50ef6f19cef5fe62cbc4582b27b145a clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java c5e577ff98bea3de65e290d30065935a29b3247f clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1091 https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line1091 Since poll() can trigger auto offset commits, and then the commits can block while polling() for some time, can we end up recursing in some bad situations, e.g. if we consistently cannot get a coordinator? We might need to keep track if a commit is outstanding and not try to commit again, or just update the values we're trying to commit. Ewen Cheslack-Postava wrote: To clarify the issue: in poll(timeout), we check if it is time to autocommit and call commit, which then calls commitOffsets. In commitOffsets, we have a while(true) loop and in it we poll for both sync and async. If that polling process takes long enough, then we could hit the next interval and those poll() calls could trigger another call to commit. Now we have 2 calls to commit on the stack. I don't think this is likely, and I'm not certain there's a condition where you can get stuck in the loop that long when using async commits. But since the logic in commitOffsets current checks response.isReady first, *then* breaks if its async, I thought it might be possible that during a connectivity issue with the coordinator, you might just get stuck in this loop even in async commit mode and trigger this recursive commit behavior (given a long enough outage/short enough auto commit interval). Jason Gustafson wrote: I think there might be some confusion caused by the the overloading of the poll method. The poll that is called in commitOffsets is basically just a wrapper around client.poll() and doesn't recurse. Perhaps I can rename these methods to clientPoll() to make this clearer? Ah, yes, that makes sense. I was getting a bit confused when reading some of the code about which poll was being invoked. Adjusting the names would probably help. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review86338 --- On June 4, 2015, 4:07 a.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 4, 2015, 4:07 a.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2186; fix rebase error and checkstyle issue Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java fac79951d50ef6f19cef5fe62cbc4582b27b145a clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java c5e577ff98bea3de65e290d30065935a29b3247f clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review86338 --- This looks good so far. I think it's much easier to understand when all the blocking stuff happens at the KafkaConsumer level and each of the classes it uses only ever handles single requests. It'd be nice to document the basic architecture somewhere since it took me a bit to fully figure it out. (Unfortunately, since the javadocs for the consumer are in the implementation class KafkaConsumer instead of on the Consumer interface, we can't put this with the KafkaConsumer class...) Some notes in addition to the inline stuff: Some functionality has been pulled back up to KafkaConsumer, in a mild reversal of Guozhang's refactoring. It'd be nice to keep this to a minimum. The ones that stuck out to me were resetOffsets()/resetOffset()/offsetBefore(). I'm guessing you also couldn't figure out a way to keep it in Fetcher since the inner call to offsetBefore() requires that blocking loop? Some handling of DelayedResponses and its subclasses seem redundant/follows a common pattern and maybe could be refactored into utility code. However, there are few enough places it's happening now that I don't think it's a big deal. It does seem a bit wasteful that we have to continually create these DelayedResponse objects even in cases where we know we'll fail fast, but I suppose those cases should be unusual and the cost to allocate them isn't all that high. Finally, a readability/cleanliness thing. This patch adds more nested anonymous RequestCompletionHandler classes. I think these are fine as they are, but if the implementations get too long or branchy with all the various error conditions they can become unreadably over-indented. Taking some of the big ones and using named nested classes might help improve clarity, although it does separate the request initiating code from the response handling code. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment138296 This seems like it'll be analogous to KIP-19's request.timeout.ms since it looks like all the use cases are sending a single request and waiting for a single response. I don't think there's any patch ready for that yet (check w/ the JIRA and maybe Jiangjie has something that hasn't been submitted yet), but if that ends up accepted we could potentially add that flag in either patch. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment138299 Might want to move this to trace. Normal consumers are going to hit this *a lot* clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment138304 I think this is simpler and clearer if you reorder these two -- reset the offsets that need it first, then update fetch positions for ones that are missing it. I think this removes the extra conditional !subscriptions.offsetResetNeeded(tp) that was added in updateFetchPositions too. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment138305 Any reason not to do this? Definitely seems like it'd be necessary although the logic might be more complicated than just moving maybeHearbeat() in here -- some of the other lead up to this could change if you saw a partition reassignment in the middle of a long poll, requiring fetch positions to be updated, etc. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment138320 Besides grouping by node, we could also send these requests out in parallel. The drawback to simplifying this all to use a series off offsetBefore() calls is that each blocks, so resetting a bunch of offsets is going to be pretty slow. Obvious solution is a utility that lets you run a bunch of requests in parallel, then do the same looping you're doing waiting for a single response but handle a bunch all at once. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment138324 Since poll() can trigger auto offset commits, and then the commits can block while polling() for some time, can we end up recursing in some bad situations, e.g. if we consistently cannot get a coordinator? We might need to keep track if a commit is outstanding and not try to commit again, or just update the values we're trying to commit. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment138323 In async mode, why bother polling at all here? You mentioned coordinator connection issues in the comment below, but if we need to
Re: Review Request 34789: Patch for KAFKA-2168
On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: This looks good so far. I think it's much easier to understand when all the blocking stuff happens at the KafkaConsumer level and each of the classes it uses only ever handles single requests. It'd be nice to document the basic architecture somewhere since it took me a bit to fully figure it out. (Unfortunately, since the javadocs for the consumer are in the implementation class KafkaConsumer instead of on the Consumer interface, we can't put this with the KafkaConsumer class...) Some notes in addition to the inline stuff: Some functionality has been pulled back up to KafkaConsumer, in a mild reversal of Guozhang's refactoring. It'd be nice to keep this to a minimum. The ones that stuck out to me were resetOffsets()/resetOffset()/offsetBefore(). I'm guessing you also couldn't figure out a way to keep it in Fetcher since the inner call to offsetBefore() requires that blocking loop? Some handling of DelayedResponses and its subclasses seem redundant/follows a common pattern and maybe could be refactored into utility code. However, there are few enough places it's happening now that I don't think it's a big deal. It does seem a bit wasteful that we have to continually create these DelayedResponse objects even in cases where we know we'll fail fast, but I suppose those cases should be unusual and the cost to allocate them isn't all that high. Finally, a readability/cleanliness thing. This patch adds more nested anonymous RequestCompletionHandler classes. I think these are fine as they are, but if the implementations get too long or branchy with all the various error conditions they can become unreadably over-indented. Taking some of the big ones and using named nested classes might help improve clarity, although it does separate the request initiating code from the response handling code. Yeah, that's right. The offsetBefore method was the tricky one to deal with. I tried to keep the lower level logic in Fetcher, but resetOffsets ended up leaking into KafkaConsumer. That was before I started using the DelayedResponse interface though, so there may be a way to move it back to Fetcher. We could definitely reduce some of the DelayedResponse object creation using several static instances. I think there are only a couple cases where it is actually necessary to have a new instance. I also think the usage is currently a little nasty since you have to make sure that the delayed response is finished through all code paths. That could As you mention, it's kind of nice having the handler code right there in the method. I actually kind of like the pattern used in some other cases where instead of a nested class, an anonymous class is created which simply delegates to a handler method. On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 683 https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line683 I think this is simpler and clearer if you reorder these two -- reset the offsets that need it first, then update fetch positions for ones that are missing it. I think this removes the extra conditional !subscriptions.offsetResetNeeded(tp) that was added in updateFetchPositions too. Yeah, I agree. I'll have to rework the code a little bit since updateFetchPositions can lead to an offset reset (in the case that the fetched and committed positions are both null), but I can probably handle that case in a separate method. On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 696 https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line696 Any reason not to do this? Definitely seems like it'd be necessary although the logic might be more complicated than just moving maybeHearbeat() in here -- some of the other lead up to this could change if you saw a partition reassignment in the middle of a long poll, requiring fetch positions to be updated, etc. No particular reason. I just hadn't thought it all the way through yet. Perhaps to make it easy, we could just exit the poll call if we need a partition reassignment or an offset update? On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 988 https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line988 Besides grouping by node, we could also send these requests out in parallel. The drawback to simplifying this all to use a series off offsetBefore() calls is that each blocks, so resetting a bunch of offsets is going to be pretty slow. Obvious solution is a utility that lets you run a bunch of requests in parallel, then do the same
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 4, 2015, 1:21 a.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description (updated) --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java fac79951d50ef6f19cef5fe62cbc4582b27b145a clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java c5e577ff98bea3de65e290d30065935a29b3247f clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 4, 2015, 4:07 a.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description (updated) --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2186; fix rebase error and checkstyle issue Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java fac79951d50ef6f19cef5fe62cbc4582b27b145a clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java c5e577ff98bea3de65e290d30065935a29b3247f clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: This looks good so far. I think it's much easier to understand when all the blocking stuff happens at the KafkaConsumer level and each of the classes it uses only ever handles single requests. It'd be nice to document the basic architecture somewhere since it took me a bit to fully figure it out. (Unfortunately, since the javadocs for the consumer are in the implementation class KafkaConsumer instead of on the Consumer interface, we can't put this with the KafkaConsumer class...) Some notes in addition to the inline stuff: Some functionality has been pulled back up to KafkaConsumer, in a mild reversal of Guozhang's refactoring. It'd be nice to keep this to a minimum. The ones that stuck out to me were resetOffsets()/resetOffset()/offsetBefore(). I'm guessing you also couldn't figure out a way to keep it in Fetcher since the inner call to offsetBefore() requires that blocking loop? Some handling of DelayedResponses and its subclasses seem redundant/follows a common pattern and maybe could be refactored into utility code. However, there are few enough places it's happening now that I don't think it's a big deal. It does seem a bit wasteful that we have to continually create these DelayedResponse objects even in cases where we know we'll fail fast, but I suppose those cases should be unusual and the cost to allocate them isn't all that high. Finally, a readability/cleanliness thing. This patch adds more nested anonymous RequestCompletionHandler classes. I think these are fine as they are, but if the implementations get too long or branchy with all the various error conditions they can become unreadably over-indented. Taking some of the big ones and using named nested classes might help improve clarity, although it does separate the request initiating code from the response handling code. Jason Gustafson wrote: Yeah, that's right. The offsetBefore method was the tricky one to deal with. I tried to keep the lower level logic in Fetcher, but resetOffsets ended up leaking into KafkaConsumer. That was before I started using the DelayedResponse interface though, so there may be a way to move it back to Fetcher. We could definitely reduce some of the DelayedResponse object creation using several static instances. I think there are only a couple cases where it is actually necessary to have a new instance. I also think the usage is currently a little nasty since you have to make sure that the delayed response is finished through all code paths. That could As you mention, it's kind of nice having the handler code right there in the method. I actually kind of like the pattern used in some other cases where instead of a nested class, an anonymous class is created which simply delegates to a handler method. Forgot to finish my thought about the nastiness of DelayedResponse usage... The issue is having to always ensure that the DelayedResponse is finished. If it doesn't get finished, then the consumer ends up retrying the request (after timing out). The code might just need to be restructured a little bit so that it's clearer that it does get finished in all cases. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review86338 --- On June 3, 2015, 12:10 a.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 3, 2015, 12:10 a.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 3, 2015, 12:10 a.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description (updated) --- KAFKA-2168; refactored callback handling to prevent unnecessary requests Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 1, 2015, 11:04 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description (updated) --- KAFKA-2168; move blocking calls into KafkaConsumer to enable async wakeup() Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetchResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review85644 --- I think close() isn't quite right, and is probably harder than just wakeup(). Also, I think there are other cases where NetworkClient.poll() is called in a loop that aren't handled, e.g. NetworkCLient.completeAll. I'm not sure these can be handled without pushing the closed flag into NetworkClient (maybe changing the name to closing to allow some operations to continue normally so code using NetworkClient can finish up whatever it was doing). clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment137308 Missed removing one of the synchronized keywords. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment137313 I think this requires a bit more coordination between the threads. As written, won't this wakeup the selector, but this thread could continue running and close metrics/client/serializers before the other thread is done with them? This gets confusing if we need to support both close() from the poll() thread and from another I guess -- in one case you need to wait for another thread to finish, in the other you can proceed immediately. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java https://reviews.apache.org/r/34789/#comment137314 Why do these all have ensureNotClosed(), but the KafkaConsumer methods don't all have it? - Ewen Cheslack-Postava On May 28, 2015, 10:58 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated May 28, 2015, 10:58 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; Remove synchronization of KafkaConsumer to enable non-blocking close Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/ClosedConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34789: Patch for KAFKA-2168
On May 28, 2015, 11:30 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 857 https://reviews.apache.org/r/34789/diff/1/?file=973873#file973873line857 I think this requires a bit more coordination between the threads. As written, won't this wakeup the selector, but this thread could continue running and close metrics/client/serializers before the other thread is done with them? This gets confusing if we need to support both close() from the poll() thread and from another I guess -- in one case you need to wait for another thread to finish, in the other you can proceed immediately. Good point. Makes me think that we should bring the syncronization back for everything except close. Then we could write close like this: ``` if (client != null) client.wakeup(); synchronized (this) { // Shutdown client, serializers, etc. } ``` - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review85644 --- On May 28, 2015, 10:58 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated May 28, 2015, 10:58 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; Remove synchronization of KafkaConsumer to enable non-blocking close Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/ClosedConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson