Re: Review Request 34789: Patch for KAFKA-2168

2015-06-30 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 30, 2015, 5:55 p.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; minor fixes


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
9be8fbc648369ad9db1a7eea94bc1b9edbfdbfd7 
  core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
f56096b826bdbf760411a54ba067a6a83eca8a10 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-30 Thread Jason Gustafson


 On June 30, 2015, 10:24 p.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 933-937
  https://reviews.apache.org/r/34789/diff/14/?file=996120#file996120line933
 
  Wondering why we want to fetch for all assigned partitions if the 
  requested partitions is indeed assigned here?

I think this is how the old code was written, but I was wondering the same 
thing. I thought perhaps it was an optimization: since we have to hit the 
coordinator anyway, we may as well update all our assigned offsets in case they 
are stale as well.


 On June 30, 2015, 10:24 p.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 1151-1153
  https://reviews.apache.org/r/34789/diff/14/?file=996120#file996120line1151
 
  This comments seems out-dated, or otherwise a bit confusing.

I think the comment is still accurate. It's basically just saying that we only 
fetch if the cache is dirty (as indicated by 
subscriptions.refreshCommitsNeeded()). I'll try to make it clearer.


 On June 30, 2015, 10:24 p.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 1249-1256
  https://reviews.apache.org/r/34789/diff/14/?file=996120#file996120line1249
 
  If the coordinator is not available, would this async commit also be 
  blocked here? Or will this be resolved after we introduced the delayed 
  queue in KAFKA-2123?

Yes, that's right, and note that this is consistent with the behavior before 
this patch (just now it's a bit more explicit). The major problem is that we 
don't have any data structure to keep track of needed requests, so if the 
coordinator is not available and we don't want to block to wait for it, then 
all we can do is discard the commit. I think the delayed queued may be what we 
need to solve this problem since we can just reschedule the commit for a later 
time if the coordinator is not available. At the same time, I feel a little 
wary about having a backlog of commits that pile up when the coordinator is 
down. I'd almost rather just fail the commit and let it be retried on the next 
interval, but that has disadvantages as well.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review89971
---


On June 30, 2015, 5:55 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 30, 2015, 5:55 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; minor fixes
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 9be8fbc648369ad9db1a7eea94bc1b9edbfdbfd7 
   core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
 f56096b826bdbf760411a54ba067a6a83eca8a10 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-30 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review89971
---



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(lines 933 - 937)
https://reviews.apache.org/r/34789/#comment142876

Wondering why we want to fetch for all assigned partitions if the requested 
partitions is indeed assigned here?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(lines 1149 - 1151)
https://reviews.apache.org/r/34789/#comment142873

This comments seems out-dated, or otherwise a bit confusing.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(lines 1247 - 1254)
https://reviews.apache.org/r/34789/#comment142874

If the coordinator is not available, would this async commit also be 
blocked here? Or will this be resolved after we introduced the delayed queue in 
KAFKA-2123?


One minor comments that are out of the scope of this follow-up patch: in 
Coordinator.handleOffsetResponse, could we rename to handleFetchOffsetResponse?

- Guozhang Wang


On June 30, 2015, 5:55 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 30, 2015, 5:55 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; minor fixes
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 9be8fbc648369ad9db1a7eea94bc1b9edbfdbfd7 
   core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
 f56096b826bdbf760411a54ba067a6a83eca8a10 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-23 Thread Jason Gustafson


 On June 23, 2015, 4:20 a.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   line 355
  https://reviews.apache.org/r/34789/diff/12/?file=990006#file990006line355
 
  What's the logic to initiate connection to coordinator if the 
  coordinator is not available during HB?

As it's currently written, we'd skip a heartbeat if we don't have an active 
connection to the coordinator. As long as the heartbeat frequency is 3 or more 
times per session timeout, this is probably ok, but we might want to handle it 
better if we end up exposing the heartbeat frequency in configuration 
(currently it's hard-coded). Perhaps we can fix this in a separate ticket?


 On June 23, 2015, 4:20 a.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 351-354
  https://reviews.apache.org/r/34789/diff/12/?file=990003#file990003line351
 
  These seem redundant give the code below.

I think it's still necessary to call wakeup to abort a long poll if you want to 
ensure timely shutdown. You could probably get away without the closed flag and 
just use the ConsumerWakeupException to close the consumer, but the explicit 
flag seems cleaner.


 On June 23, 2015, 4:20 a.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 436
  https://reviews.apache.org/r/34789/diff/12/?file=990003#file990003line436
 
  Should this be volatile so that different threads can see the latest 
  value of refcount?

I think you are right. Fixed in latest patch.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review88914
---


On June 23, 2015, 4:39 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 23, 2015, 4:39 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; make refcount in KafkaConsumer an AtomicInteger
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 9be8fbc648369ad9db1a7eea94bc1b9edbfdbfd7 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-23 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 23, 2015, 4:39 p.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; make refcount in KafkaConsumer an AtomicInteger


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
9be8fbc648369ad9db1a7eea94bc1b9edbfdbfd7 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-22 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review88796
---

Ship it!


I did not review it thoroughly but the design looks clean to me. Great work!

I think we can check it in to unblock other JIRAs, and come back to it when 
necessary in the future for any follow-up work.


clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
 (lines 15 - 18)
https://reviews.apache.org/r/34789/#comment141379

We would like to have a serialVersionUID for any classes extending 
Serializable. You can take a look at ConfigException for example.


- Guozhang Wang


On June 19, 2015, 4:19 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 19, 2015, 4:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; address more review comments
 
 
 KAFKA-2168; updated for review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 951c34c92710fc4b38d656e99d2a41255c60aeb7 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  41cb9458f51875ac9418fce52f264b35adba92f4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
 f73eedb030987f018d8446bb1dcd98d19fa97331 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  1454ab73df22cce028f41f74b970628829da4e9d 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
   clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-22 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 22, 2015, 11:35 p.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; refactored callback handling to prevent unnecessary requests


KAFKA-2168; address review comments


KAFKA-2168; fix rebase error and checkstyle issue


KAFKA-2168; address review comments and add docs


KAFKA-2168; handle polling with timeout 0


KAFKA-2168; timeout=0 means return immediately


KAFKA-2168; address review comments


KAFKA-2168; address more review comments


KAFKA-2168; updated for review comments


KAFKA-2168; add serialVersionUID to ConsumerWakeupException


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
1ca75f83d3667f7d01da1ae2fd9488fb79562364 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
951c34c92710fc4b38d656e99d2a41255c60aeb7 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
f50da825756938c193d7f07bee953e000e2627d9 
  
clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 41cb9458f51875ac9418fce52f264b35adba92f4 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
56281ee15cc33dfc96ff64d5b1e596047c7132a4 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 cee75410127dd1b86c1156563003216d93a086b3 
  clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
f73eedb030987f018d8446bb1dcd98d19fa97331 
  clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
677edd385f35d4262342b567262c0b874876d25b 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 1454ab73df22cce028f41f74b970628829da4e9d 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
  clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-22 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review88914
---

Ship it!


Thanks for the latest patch. Looks good overall. To avoid holding to this 
relative large patch for too long, I am committed the latest patch to trunk. 
There are a few minor comments below and we can commit any necessary fix in a 
follow up patch.


clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(lines 335 - 338)
https://reviews.apache.org/r/34789/#comment141530

These seem redundant give the code below.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(line 420)
https://reviews.apache.org/r/34789/#comment141531

Should this be volatile so that different threads can see the latest value 
of refcount?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 319)
https://reviews.apache.org/r/34789/#comment141529

What's the logic to initiate connection to coordinator if the coordinator 
is not available during HB?


- Jun Rao


On June 22, 2015, 11:35 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 22, 2015, 11:35 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; address more review comments
 
 
 KAFKA-2168; updated for review comments
 
 
 KAFKA-2168; add serialVersionUID to ConsumerWakeupException
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 951c34c92710fc4b38d656e99d2a41255c60aeb7 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  41cb9458f51875ac9418fce52f264b35adba92f4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
 f73eedb030987f018d8446bb1dcd98d19fa97331 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  1454ab73df22cce028f41f74b970628829da4e9d 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
   clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-19 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 19, 2015, 4:19 p.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; refactored callback handling to prevent unnecessary requests


KAFKA-2168; address review comments


KAFKA-2168; fix rebase error and checkstyle issue


KAFKA-2168; address review comments and add docs


KAFKA-2168; handle polling with timeout 0


KAFKA-2168; timeout=0 means return immediately


KAFKA-2168; address review comments


KAFKA-2168; address more review comments


KAFKA-2168; updated for review comments


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
1ca75f83d3667f7d01da1ae2fd9488fb79562364 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
951c34c92710fc4b38d656e99d2a41255c60aeb7 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
f50da825756938c193d7f07bee953e000e2627d9 
  
clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 41cb9458f51875ac9418fce52f264b35adba92f4 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
56281ee15cc33dfc96ff64d5b1e596047c7132a4 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 cee75410127dd1b86c1156563003216d93a086b3 
  clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
f73eedb030987f018d8446bb1dcd98d19fa97331 
  clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
677edd385f35d4262342b567262c0b874876d25b 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 1454ab73df22cce028f41f74b970628829da4e9d 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
  clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-18 Thread Jason Gustafson


 On June 18, 2015, 9:59 p.m., Jay Kreps wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1364
  https://reviews.apache.org/r/34789/diff/9-10/?file=983137#file983137line1364
 
  This seems like one of these things that is clever but invariably ends 
  up not quite working. Did we actually determine there is a performance hit 
  from just synchronizing. Biased locking and lock elision should make this 
  very cheap right? Given we acquire locks for all the perf stats and there 
  are several stat updates for each poll call I don't see the problem with 
  just synchronizing.
 
 Ewen Cheslack-Postava wrote:
 Adding synchronization has at least one other downside besides 
 performance - you can pretty easily end up deadlocking due to callbacks 
 (consumer rebalance, commit) if you also have other synchronization outside 
 the consumer itself. And if someone has a chance of accessing the consumer 
 from multiple threads, that probably does mean they have some other 
 synchronization (or should). Deadlocks are generally easier to diagnose than 
 synchronization bugs, but you're introducing potential issues either way.
 
 Based on previous conversation, I think this addresses the main concern 
 with the unsynchronized version and actually results in a lot *less* 
 unintuitive behavior than the synchronized version. If I'm already unaware of 
 the fact that I am incorrectly calling the consumer from multiple threads, 
 using the synchronized version is just going to make me think 
 commits/seeks/etc are running really slow as I wait for poll() calls to 
 return, and then blame the library because as far as I know I'm using it 
 correctly and it's just not behaving well. This solution seems a lot better 
 because although it is not guaranteed to catch conflicts, you're a lot more 
 likely to hit them, they get turned into actual exceptions, and the cause is 
 made very clear to you -- it specifically puts the blame on the code calling 
 these methods and indicates that the caller has a bug in their code since 
 they have not properly synchronized access to the consumer.
 
 A real race detector is more likely to pick up these errors, but this is 
 a pretty good way to have a fair chance of catching the error and informing 
 the user, especially since the most likely error is calling a method while a 
 poll() is running, and those should generally have pretty long timeouts. 
 Having to litter the code with try/finally blocks is a definite drawback, but 
 just encourages us to keep the API as small as possible :)

As Ewen said, I think the main advantage is that it forces users into the 
single-threaded access model that we want them to use. However, if we think 
this implementation is a bit too clever (and if we aren't overly concerned 
about the potential for deadlock in the callback), then we could use a 
traditional ReentrantLock and get the same behavior by using tryLock(). I doubt 
the performance impact would be significant.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review88441
---


On June 18, 2015, 9:40 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 18, 2015, 9:40 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; address more review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 951c34c92710fc4b38d656e99d2a41255c60aeb7 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-18 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 18, 2015, 9:40 p.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; refactored callback handling to prevent unnecessary requests


KAFKA-2168; address review comments


KAFKA-2168; fix rebase error and checkstyle issue


KAFKA-2168; address review comments and add docs


KAFKA-2168; handle polling with timeout 0


KAFKA-2168; timeout=0 means return immediately


KAFKA-2168; address review comments


KAFKA-2168; address more review comments


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
1ca75f83d3667f7d01da1ae2fd9488fb79562364 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
951c34c92710fc4b38d656e99d2a41255c60aeb7 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
f50da825756938c193d7f07bee953e000e2627d9 
  
clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 41cb9458f51875ac9418fce52f264b35adba92f4 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
56281ee15cc33dfc96ff64d5b1e596047c7132a4 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 cee75410127dd1b86c1156563003216d93a086b3 
  clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
f73eedb030987f018d8446bb1dcd98d19fa97331 
  clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
677edd385f35d4262342b567262c0b874876d25b 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 1454ab73df22cce028f41f74b970628829da4e9d 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
  clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-18 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review88441
---



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(line 1357)
https://reviews.apache.org/r/34789/#comment140961

This seems like one of these things that is clever but invariably ends up 
not quite working. Did we actually determine there is a performance hit from 
just synchronizing. Biased locking and lock elision should make this very cheap 
right? Given we acquire locks for all the perf stats and there are several stat 
updates for each poll call I don't see the problem with just synchronizing.


- Jay Kreps


On June 18, 2015, 9:40 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 18, 2015, 9:40 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; address more review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 951c34c92710fc4b38d656e99d2a41255c60aeb7 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  41cb9458f51875ac9418fce52f264b35adba92f4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
 f73eedb030987f018d8446bb1dcd98d19fa97331 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  1454ab73df22cce028f41f74b970628829da4e9d 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
   clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-18 Thread Ewen Cheslack-Postava


 On June 18, 2015, 9:59 p.m., Jay Kreps wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1364
  https://reviews.apache.org/r/34789/diff/9-10/?file=983137#file983137line1364
 
  This seems like one of these things that is clever but invariably ends 
  up not quite working. Did we actually determine there is a performance hit 
  from just synchronizing. Biased locking and lock elision should make this 
  very cheap right? Given we acquire locks for all the perf stats and there 
  are several stat updates for each poll call I don't see the problem with 
  just synchronizing.

Adding synchronization has at least one other downside besides performance - 
you can pretty easily end up deadlocking due to callbacks (consumer rebalance, 
commit) if you also have other synchronization outside the consumer itself. And 
if someone has a chance of accessing the consumer from multiple threads, that 
probably does mean they have some other synchronization (or should). Deadlocks 
are generally easier to diagnose than synchronization bugs, but you're 
introducing potential issues either way.

Based on previous conversation, I think this addresses the main concern with 
the unsynchronized version and actually results in a lot *less* unintuitive 
behavior than the synchronized version. If I'm already unaware of the fact that 
I am incorrectly calling the consumer from multiple threads, using the 
synchronized version is just going to make me think commits/seeks/etc are 
running really slow as I wait for poll() calls to return, and then blame the 
library because as far as I know I'm using it correctly and it's just not 
behaving well. This solution seems a lot better because although it is not 
guaranteed to catch conflicts, you're a lot more likely to hit them, they get 
turned into actual exceptions, and the cause is made very clear to you -- it 
specifically puts the blame on the code calling these methods and indicates 
that the caller has a bug in their code since they have not properly 
synchronized access to the consumer.

A real race detector is more likely to pick up these errors, but this is a 
pretty good way to have a fair chance of catching the error and informing the 
user, especially since the most likely error is calling a method while a poll() 
is running, and those should generally have pretty long timeouts. Having to 
litter the code with try/finally blocks is a definite drawback, but just 
encourages us to keep the API as small as possible :)


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review88441
---


On June 18, 2015, 9:40 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 18, 2015, 9:40 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; address more review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 951c34c92710fc4b38d656e99d2a41255c60aeb7 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  41cb9458f51875ac9418fce52f264b35adba92f4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-18 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review88451
---



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(line 725)
https://reviews.apache.org/r/34789/#comment140985

Might help readers to explain that this pipelines the next request with the 
user's processing of the current set of records



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(line 1344)
https://reviews.apache.org/r/34789/#comment140986

Used only once? This looks like it should either be done inline, or is a 
generic utility that should end up in Utils instead.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(line 1372)
https://reviews.apache.org/r/34789/#comment140984

This doesn't handle nested calls properly, e.g. poll() - callback - 
seek/commit/etc. You may need currentThread and a refcount so you only clear 
currentThread when the refcount hits 0.


- Ewen Cheslack-Postava


On June 18, 2015, 9:40 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 18, 2015, 9:40 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; address more review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 951c34c92710fc4b38d656e99d2a41255c60aeb7 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  41cb9458f51875ac9418fce52f264b35adba92f4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
 f73eedb030987f018d8446bb1dcd98d19fa97331 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  1454ab73df22cce028f41f74b970628829da4e9d 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
   clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-18 Thread Jason Gustafson


 On June 18, 2015, 11:13 p.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1379
  https://reviews.apache.org/r/34789/diff/9-10/?file=983137#file983137line1379
 
  This doesn't handle nested calls properly, e.g. poll() - callback - 
  seek/commit/etc. You may need currentThread and a refcount so you only 
  clear currentThread when the refcount hits 0.

You're totally right (blush). Do you think it's worth pulling this 
functionality into a separate class?


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review88451
---


On June 18, 2015, 9:40 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 18, 2015, 9:40 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; address more review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 951c34c92710fc4b38d656e99d2a41255c60aeb7 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  41cb9458f51875ac9418fce52f264b35adba92f4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
 f73eedb030987f018d8446bb1dcd98d19fa97331 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  1454ab73df22cce028f41f74b970628829da4e9d 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
   clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-18 Thread Ewen Cheslack-Postava


 On June 18, 2015, 11:13 p.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1379
  https://reviews.apache.org/r/34789/diff/9-10/?file=983137#file983137line1379
 
  This doesn't handle nested calls properly, e.g. poll() - callback - 
  seek/commit/etc. You may need currentThread and a refcount so you only 
  clear currentThread when the refcount hits 0.
 
 Jason Gustafson wrote:
 You're totally right (blush). Do you think it's worth pulling this 
 functionality into a separate class?

I probably wouldn't until we have a reason to reuse it. It's only 2 fields and 
2 methods.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review88451
---


On June 18, 2015, 9:40 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 18, 2015, 9:40 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; address more review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 951c34c92710fc4b38d656e99d2a41255c60aeb7 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  41cb9458f51875ac9418fce52f264b35adba92f4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
 f73eedb030987f018d8446bb1dcd98d19fa97331 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  1454ab73df22cce028f41f74b970628829da4e9d 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
   clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-16 Thread Jason Gustafson


 On June 15, 2015, 6:09 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1199
  https://reviews.apache.org/r/34789/diff/9/?file=983137#file983137line1199
 
  Doesn't this cause poll() to block for the backoff time?

Generally it wouldn't, but there are a couple cases where it could. For 
example, we could be waiting to discover the coordinator, or we might need to 
fetch some partition offsets. In either case, we could block longer than the 
poll timeout. Note that this is consistent with the old version. I had hoped to 
address it as part of solving KAFKA-1894.


 On June 15, 2015, 6:09 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 928
  https://reviews.apache.org/r/34789/diff/9/?file=983137#file983137line928
 
  Could you explain a bit more how wakeup() can be used for commit 
  offsets or seek to a new position? Does a separate thread have to first 
  save some partitions for committing offsets in the poll thread and then 
  wake it up? Then the poll thread will initiate the offset commit before the 
  next poll request?

It's admittedly a little awkward, but you could do it the way you suggest. For 
example, you could use a flag and check it in the poll thread:

```
while (true) {
  if (needCommit.get()) {
consumer.commit(offsets)
needCommit.set(false)
  }
  
  records = consumer.poll(5)
  // submit records to executor perhaps
}
```

Then the other thread just needs to set the flag and offsets that need 
committing and wakeup the consumer.

Alternatively, you could protect the consumer with a lock and commit the 
offsets from a separate thread. The difficulty with this solution is that you 
need to make sure that the polling thread does not simply reacquire the lock 
after being woken up. This problem can be dealt with by introducing a second 
lock or perhaps by getting tricky with a semaphore, but I'd probably recommend 
the previous solution instead since it's easier to understand.


 On June 15, 2015, 6:09 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 686-687
  https://reviews.apache.org/r/34789/diff/9/?file=983137#file983137line686
 
  It's not very clear to me if the caller is woken up from a poll() call, 
  whether the next poll() call will resume from the exact same state the 
  previous one is left off.
  
  For example, let's say the first pool() call just issued a 
  consumerMetadataRequest in ensureCoordinatorKnown() and then is woken up. 
  In the next poll() call, do we just wait for the response from the previous 
  consumerMetadataRequest or will it trigger a new consumerMetadataRequest? 
  If it's the latter, will the two responses be processed out of order?

Waking up the consumer can leave requests pending. This could also happen if we 
change the code to actually respect the poll timeout (and I think this is the 
main reason why it hasn't been done). I was thinking about wakeup() as 
primarily a way to get timely shutdown behavior, but if it is also used for 
commits or seeks, then we have to solve this problem. I have thought about 
adding a cancel() operation to the RequestFuture. It wouldn't stop an in-flight 
request, but you could prevent its callback from being executed. This would 
probably be good enough to deal with a pending consumerMetadataRequest, but we 
wouldn't want to leave a JoinGroup request pending since resending it may cause 
unnecessary rebalancing. Perhaps we shouldn't allow wakeups in those cases? 
Maybe the only case we actually wakeup from is when we are polling for data?


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87880
---


On June 11, 2015, 9:10 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 11, 2015, 9:10 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 KAFKA-2168; address review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-15 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87880
---


Thanks for the new patch. A few more quick comments.


clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment140366

It's not very clear to me if the caller is woken up from a poll() call, 
whether the next poll() call will resume from the exact same state the previous 
one is left off.

For example, let's say the first pool() call just issued a 
consumerMetadataRequest in ensureCoordinatorKnown() and then is woken up. In 
the next poll() call, do we just wait for the response from the previous 
consumerMetadataRequest or will it trigger a new consumerMetadataRequest? If 
it's the latter, will the two responses be processed out of order?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment140293

typo: this do



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment140353

Could you explain a bit more how wakeup() can be used for commit offsets or 
seek to a new position? Does a separate thread have to first save some 
partitions for committing offsets in the poll thread and then wake it up? Then 
the poll thread will initiate the offset commit before the next poll request?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment140346

Doesn't this cause poll() to block for the backoff time?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment140345

It seems that the expectation is that if the request times out, any 
callbacks registered with the previously submitted requests will no longer be 
called. Perhaps we can add this in the TODO comment.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
https://reviews.apache.org/r/34789/#comment140349

To be consistent with the naming of the request, perhaps it's better to 
name this listOffset and the next one handleListOffsetResponse.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
https://reviews.apache.org/r/34789/#comment140344

Incomplete comment after but a.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
https://reviews.apache.org/r/34789/#comment140347

This seems never used. Is there a use case for this?


- Jun Rao


On June 11, 2015, 9:10 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 11, 2015, 9:10 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 KAFKA-2168; address review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-11 Thread Jason Gustafson


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 797-798
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797
 
  Hmm, seekToBegining() is supposed to be a blocking call. Basically, at 
  the end of the call, we expect the fetch offset to be set to the beginning. 
  This is now changed to async, which doesn't match the intended behavior. We 
  need to think through if this matters or not.
  
  Ditto for seekToEnd().
 
 Jason Gustafson wrote:
 Since we always update fetch positions before a new fetch and in 
 position(), it didn't seem necessary to make it synchronous. I thought this 
 handling might be more consistent with how new subscriptions are handled 
 (which are asynchronous and defer the initial offset fetch until the next 
 poll or position). That being said, I don't have a strong feeling about it, 
 so we could return to the blocking version.
 
 Jun Rao wrote:
 Making this async may be fine. One implication is that we call position() 
 immediately after seekToBeginning(), we may not be able to get the correct 
 offset.
 
 Jason Gustafson wrote:
 We should be able to get the right offset since we always update offsets 
 before returning the current position, but we might have to block for it. 
 It's similar to if you call subscribe(topic) and then try to get its position 
 immediately.
 
 Jun Rao wrote:
 That may work. However, if one calls seekToBegining() followed by 
 seekToEnd(), will we guarantee that position() returns the end offset?

Yes, this will work. The latest seek will overwrite any pending ones.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 319-322
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line319
 
  Could we add an example of how to use the new wakeup() call, especially 
  with closing the consumer properly? For example, does the consumer thread 
  just catch the ConsumerWakeupException and then call close()?

I've added an example in the latest patch.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 1039-1040
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1039
 
  The returned response may be ready already after the offsetBefore call 
  due to needing metadata refresh. Since we don't check the ready state 
  immediately afterward, we may be delaying the processing of metadata 
  refresh by the request timeout.
 
 Jason Gustafson wrote:
 This is a pretty good point. One of the reasons working with 
 NetworkClient is tricky is that you need several polls to complete a request: 
 one to connect, one to send, and one to receive. In this case, the result 
 might not be ready because we are in the middle of connecting to the broker, 
 in which case we need to call poll() to finish the connect. If we don't, then 
 then next request will just fail for the same reason. I'll look to see if 
 there's a way to fix this to avoid unnecessary calls to poll.

I struggled a bit trying to fix this. In the latest patch, I changed the notion 
of remedy to a retryAction and included polling as one of the possible 
actions. Then if the result is finished, we only would call poll if the result 
indicates that it's needed. The only case where I actually use this is when a 
connection has just been initiated.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
---


On June 11, 2015, 9:10 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 11, 2015, 9:10 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 KAFKA-2168; address review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-11 Thread Jason Gustafson


 On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java,
   line 15
  https://reviews.apache.org/r/34789/diff/3/?file=976967#file976967line15
 
  The classes named XResponse may be a bit confusing because the protocol 
  responses use that terminology. Future? Result?
 
 Jason Gustafson wrote:
 Agreed. In fact, they were XResult initially. I changed them because 
 BrokerResult and CoordinatorResult didn't seems to suggest as clearly what 
 they were for as BrokerResponse and CoordinatorResponse. I considered Future 
 as well, but its usage is a bit different than traditional Java Futures. 
 Perhaps XReply?
 
 Ewen Cheslack-Postava wrote:
 Even though there's no blocking get(), XFuture might be the clearest. 
 XReply would work, but has a similar issue that it gets confusing whether 
 XResponse or XReply is the actual message received back vs. the processed 
 data that you wanted to extract.

In the latest patch, I changed DelayedResult to RequestFuture. Think that's 
better?


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review86338
---


On June 11, 2015, 9:10 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 11, 2015, 9:10 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 KAFKA-2168; address review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-11 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 11, 2015, 9:10 p.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; refactored callback handling to prevent unnecessary requests


KAFKA-2168; address review comments


KAFKA-2168; fix rebase error and checkstyle issue


KAFKA-2168; address review comments and add docs


KAFKA-2168; handle polling with timeout 0


KAFKA-2168; timeout=0 means return immediately


KAFKA-2168; address review comments


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
1ca75f83d3667f7d01da1ae2fd9488fb79562364 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d1d1ec178f60dc47d408f52a89e52886c1a093a2 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
f50da825756938c193d7f07bee953e000e2627d9 
  
clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
56281ee15cc33dfc96ff64d5b1e596047c7132a4 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 cee75410127dd1b86c1156563003216d93a086b3 
  clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
677edd385f35d4262342b567262c0b874876d25b 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Ewen Cheslack-Postava


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java,
   line 16
  https://reviews.apache.org/r/34789/diff/8/?file=980077#file980077line16
 
  Do we need NONE?
 
 Jason Gustafson wrote:
 It was there before, but I don't think it's actually used. I'd be fine 
 removing it.

You need it to properly parse the none config value and so there is an 
OffsetResetStrategy value to indicate that option. It's just not currently used 
because you handle it with an else rather than an else if.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java,
   line 196
  https://reviews.apache.org/r/34789/diff/8/?file=980084#file980084line196
 
  To be consistent with the naming convention with the rest of the 
  methods, should we just name it offsetRestNeeded()?
 
 Jason Gustafson wrote:
 Haha, I actually used that convention initially, but it was a little 
 confusing at times which method should be used. I can change it back, or we 
 can add the is prefix to the other usages. Preferences?

I think I've seen a number of other places where we're not exactly consistent 
with this (grep for [.]is for lots of examples). Naming conventions seem to 
be quite mixed between this type, get/set style, and just bare names. Not sure 
it's worth worrying about beyond readability issues.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 5, 2015, 7:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Jun Rao


 On June 9, 2015, 6:49 p.m., Jay Kreps wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 322
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line322
 
  Do we need this? There is no real guarantee on the poll time, so it 
  seems that we could just return when wakeup is called.
 
 Jason Gustafson wrote:
 You might be waking up from a synchronous commit, for example. In that 
 case, all we can do is raise an exception. We could alternatively say that 
 wakeup only applies to the poll() method and cannot be used to interrupt the 
 other calls.

If poll just returns on wakeup, how would the caller know if there is an 
intention to close the consumer?


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87257
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 5, 2015, 7:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Jun Rao


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1212
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212
 
  -1 makes the pollClient block forever. So, we don't get a chance to do 
  the wakeup check.
 
 Jason Gustafson wrote:
 I might be wrong, but I think we can still use NetworkClient.wakeup to 
 interrupt a poll call which is waiting forever.
 
 Jun Rao wrote:
 Yes, you are right. I missed that.

Actually, this seems to be still a problem. The issus is that when 
NetworkClient.wakeup is made, NetworkClient.poll may not be called. In the 
case, the next NetworkClient.poll may still block for the timeout.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 5, 2015, 7:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Jason Gustafson


 On June 9, 2015, 6:49 p.m., Jay Kreps wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 322
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line322
 
  Do we need this? There is no real guarantee on the poll time, so it 
  seems that we could just return when wakeup is called.
 
 Jason Gustafson wrote:
 You might be waking up from a synchronous commit, for example. In that 
 case, all we can do is raise an exception. We could alternatively say that 
 wakeup only applies to the poll() method and cannot be used to interrupt the 
 other calls.
 
 Jun Rao wrote:
 If poll just returns on wakeup, how would the caller know if there is an 
 intention to close the consumer?

I think the user would have to use a separate flag to indicate their intention 
to close. Then their shutdown hook would first set the flag, then call 
wakeup(). The polling thread then might look like this:


```
KakfaConsumer consumer = new KafkaConsumer()
try {
  consumer.subscribe(foo);
  
  while (!closed) {
records = consumer.poll(5000)

// Do something with the records
  }
} finally {
  consumer.close();
}
```


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87257
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 5, 2015, 7:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Jun Rao


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1212
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212
 
  -1 makes the pollClient block forever. So, we don't get a chance to do 
  the wakeup check.
 
 Jason Gustafson wrote:
 I might be wrong, but I think we can still use NetworkClient.wakeup to 
 interrupt a poll call which is waiting forever.

Yes, you are right. I missed that.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 797-798
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797
 
  Hmm, seekToBegining() is supposed to be a blocking call. Basically, at 
  the end of the call, we expect the fetch offset to be set to the beginning. 
  This is now changed to async, which doesn't match the intended behavior. We 
  need to think through if this matters or not.
  
  Ditto for seekToEnd().
 
 Jason Gustafson wrote:
 Since we always update fetch positions before a new fetch and in 
 position(), it didn't seem necessary to make it synchronous. I thought this 
 handling might be more consistent with how new subscriptions are handled 
 (which are asynchronous and defer the initial offset fetch until the next 
 poll or position). That being said, I don't have a strong feeling about it, 
 so we could return to the blocking version.

Making this async may be fine. One implication is that we call position() 
immediately after seekToBeginning(), we may not be able to get the correct 
offset.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 5, 2015, 7:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Jason Gustafson


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1212
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212
 
  -1 makes the pollClient block forever. So, we don't get a chance to do 
  the wakeup check.
 
 Jason Gustafson wrote:
 I might be wrong, but I think we can still use NetworkClient.wakeup to 
 interrupt a poll call which is waiting forever.
 
 Jun Rao wrote:
 Yes, you are right. I missed that.
 
 Jun Rao wrote:
 Actually, this seems to be still a problem. The issus is that when 
 NetworkClient.wakeup is made, NetworkClient.poll may not be called. In the 
 case, the next NetworkClient.poll may still block for the timeout.

From the javadocs for Selector, the wakeup will apply to the next poll if one 
is not in progress. But perhaps we should just check the wakeup flag before 
entering the poll to be safe.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 797-798
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797
 
  Hmm, seekToBegining() is supposed to be a blocking call. Basically, at 
  the end of the call, we expect the fetch offset to be set to the beginning. 
  This is now changed to async, which doesn't match the intended behavior. We 
  need to think through if this matters or not.
  
  Ditto for seekToEnd().
 
 Jason Gustafson wrote:
 Since we always update fetch positions before a new fetch and in 
 position(), it didn't seem necessary to make it synchronous. I thought this 
 handling might be more consistent with how new subscriptions are handled 
 (which are asynchronous and defer the initial offset fetch until the next 
 poll or position). That being said, I don't have a strong feeling about it, 
 so we could return to the blocking version.
 
 Jun Rao wrote:
 Making this async may be fine. One implication is that we call position() 
 immediately after seekToBeginning(), we may not be able to get the correct 
 offset.

We should be able to get the right offset since we always update offsets before 
returning the current position, but we might have to block for it. It's similar 
to if you call subscribe(topic) and then try to get its position immediately.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 5, 2015, 7:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-10 Thread Jun Rao


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1212
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212
 
  -1 makes the pollClient block forever. So, we don't get a chance to do 
  the wakeup check.
 
 Jason Gustafson wrote:
 I might be wrong, but I think we can still use NetworkClient.wakeup to 
 interrupt a poll call which is waiting forever.
 
 Jun Rao wrote:
 Yes, you are right. I missed that.
 
 Jun Rao wrote:
 Actually, this seems to be still a problem. The issus is that when 
 NetworkClient.wakeup is made, NetworkClient.poll may not be called. In the 
 case, the next NetworkClient.poll may still block for the timeout.
 
 Jason Gustafson wrote:
 From the javadocs for Selector, the wakeup will apply to the next poll if 
 one is not in progress. But perhaps we should just check the wakeup flag 
 before entering the poll to be safe.

Yes, in that case, this is not an issue. We probaly don't have to check the 
wakeup flag before the poll call since the flag could change immediately after 
the check.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 797-798
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797
 
  Hmm, seekToBegining() is supposed to be a blocking call. Basically, at 
  the end of the call, we expect the fetch offset to be set to the beginning. 
  This is now changed to async, which doesn't match the intended behavior. We 
  need to think through if this matters or not.
  
  Ditto for seekToEnd().
 
 Jason Gustafson wrote:
 Since we always update fetch positions before a new fetch and in 
 position(), it didn't seem necessary to make it synchronous. I thought this 
 handling might be more consistent with how new subscriptions are handled 
 (which are asynchronous and defer the initial offset fetch until the next 
 poll or position). That being said, I don't have a strong feeling about it, 
 so we could return to the blocking version.
 
 Jun Rao wrote:
 Making this async may be fine. One implication is that we call position() 
 immediately after seekToBeginning(), we may not be able to get the correct 
 offset.
 
 Jason Gustafson wrote:
 We should be able to get the right offset since we always update offsets 
 before returning the current position, but we might have to block for it. 
 It's similar to if you call subscribe(topic) and then try to get its position 
 immediately.

That may work. However, if one calls seekToBegining() followed by seekToEnd(), 
will we guarantee that position() returns the end offset?


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 5, 2015, 7:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-09 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87257
---



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139570

Do we need this? There is no real guarantee on the poll time, so it seems 
that we could just return when wakeup is called.


- Jay Kreps


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 5, 2015, 7:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-09 Thread Jason Gustafson


 On June 9, 2015, 6:49 p.m., Jay Kreps wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 322
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line322
 
  Do we need this? There is no real guarantee on the poll time, so it 
  seems that we could just return when wakeup is called.

You might be waking up from a synchronous commit, for example. In that case, 
all we can do is raise an exception. We could alternatively say that wakeup 
only applies to the poll() method and cannot be used to interrupt the other 
calls.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87257
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 5, 2015, 7:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-09 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
---


Thanks for the patch. A few comments below.


clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139565

Could we add an example of how to use the new wakeup() call, especially 
with closing the consumer properly? For example, does the consumer thread just 
catch the ConsumerWakeupException and then call close()?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139574

Hmm, seekToBegining() is supposed to be a blocking call. Basically, at the 
end of the call, we expect the fetch offset to be set to the beginning. This is 
now changed to async, which doesn't match the intended behavior. We need to 
think through if this matters or not.

Ditto for seekToEnd().



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139577

Should we pass in tp to isOffsetResetNeeded()?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139509

The returned response may be ready already after the offsetBefore call due 
to needing metadata refresh. Since we don't check the ready state immediately 
afterward, we may be delaying the processing of metadata refresh by the request 
timeout.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139564

Currently, our coding convention is not to wrap single line statement with 
{}. There are a few other cases like this.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139544

In the async mode, response may not be ready in the first iteration. Are we 
handling the retry properly in that case?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139545

We probably need to make some changes here when KAFKA-2120 is done to 
handle the request timeout propertly. Perhaps we can add a TODO comment here.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139563

-1 makes the pollClient block forever. So, we don't get a chance to do the 
wakeup check.



clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
https://reviews.apache.org/r/34789/#comment139588

Do we need NONE?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
https://reviews.apache.org/r/34789/#comment139579

We probably can make it clear that this is a non-blocking call and doesn't 
wait for the request to be sent or the response to be received. It would be 
good to do that on other similar methods too.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
https://reviews.apache.org/r/34789/#comment139537

Should asynchronous by synchronous?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
https://reviews.apache.org/r/34789/#comment139586

Similar as the above, it would be useful to make it clear that 
fetchOffsets() is non-blocking.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
https://reviews.apache.org/r/34789/#comment139534

Perhaps we can define a static method to initalize the constant and set the 
state. It's clearer that way since the instantiation and the initialization are 
in the same place. With this, we probably don't need the static getter methods 
and can just let the caller use the static constants directly.

Ditto for BrokerResult.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
https://reviews.apache.org/r/34789/#comment139540

The comment seems inaccurate. We are not returning an error, but returning 
a remedy instead.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
https://reviews.apache.org/r/34789/#comment139504

Do we need both hasRemedy and hasException? It seems that if one returns 
true, the other should always return false.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
https://reviews.apache.org/r/34789/#comment139578

To be consistent with the naming convention with the rest of the methods, 
should we just name it offsetRestNeeded()?


- Jun Rao


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-09 Thread Jason Gustafson


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 995
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line995
 
  Should we pass in tp to isOffsetResetNeeded()?

Yes, we should. I'll fix it.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 797-798
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797
 
  Hmm, seekToBegining() is supposed to be a blocking call. Basically, at 
  the end of the call, we expect the fetch offset to be set to the beginning. 
  This is now changed to async, which doesn't match the intended behavior. We 
  need to think through if this matters or not.
  
  Ditto for seekToEnd().

Since we always update fetch positions before a new fetch and in position(), it 
didn't seem necessary to make it synchronous. I thought this handling might be 
more consistent with how new subscriptions are handled (which are asynchronous 
and defer the initial offset fetch until the next poll or position). That being 
said, I don't have a strong feeling about it, so we could return to the 
blocking version.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 1039-1040
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1039
 
  The returned response may be ready already after the offsetBefore call 
  due to needing metadata refresh. Since we don't check the ready state 
  immediately afterward, we may be delaying the processing of metadata 
  refresh by the request timeout.

This is a pretty good point. One of the reasons working with NetworkClient is 
tricky is that you need several polls to complete a request: one to connect, 
one to send, and one to receive. In this case, the result might not be ready 
because we are in the middle of connecting to the broker, in which case we need 
to call poll() to finish the connect. If we don't, then then next request will 
just fail for the same reason. I'll look to see if there's a way to fix this to 
avoid unnecessary calls to poll.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 1139-1141
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1139
 
  In the async mode, response may not be ready in the first iteration. 
  Are we handling the retry properly in that case?

When an async commit request fails, we do not retry, which is consistent with 
the current consumer. I think Ewen's patch for KAFKA-2123 introduces a good 
approach to retrying async commits. My own preference is to let them fail fast 
as long as the user has the callback from KAFKA-2123 to handle their failure. 
Otherwise, it gets a little tricky trying to preserve their order.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1195
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1195
 
  We probably need to make some changes here when KAFKA-2120 is done to 
  handle the request timeout propertly. Perhaps we can add a TODO comment 
  here.

I'll add a note. This also falls in the purview of KAFKA-1894.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java,
   line 16
  https://reviews.apache.org/r/34789/diff/8/?file=980077#file980077line16
 
  Do we need NONE?

It was there before, but I don't think it's actually used. I'd be fine removing 
it.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java,
   lines 20-27
  https://reviews.apache.org/r/34789/diff/8/?file=980080#file980080line20
 
  Perhaps we can define a static method to initalize the constant and set 
  the state. It's clearer that way since the instantiation and the 
  initialization are in the same place. With this, we probably don't need the 
  static getter methods and can just let the caller use the static constants 
  directly.
  
  Ditto for BrokerResult.

Agreed. I'll fix it.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1212
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212
 
  -1 makes the pollClient block forever. So, we don't get a chance to do 
  the wakeup check.

I might be wrong, but I think we can still use NetworkClient.wakeup to 
interrupt a poll call which is waiting forever.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 1078-1080
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1078
 
  

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-05 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 5, 2015, 7:02 p.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; refactored callback handling to prevent unnecessary requests


KAFKA-2168; address review comments


KAFKA-2168; fix rebase error and checkstyle issue


KAFKA-2168; address review comments and add docs


KAFKA-2168; handle polling with timeout 0


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
1ca75f83d3667f7d01da1ae2fd9488fb79562364 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d1d1ec178f60dc47d408f52a89e52886c1a093a2 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
f50da825756938c193d7f07bee953e000e2627d9 
  
clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
56281ee15cc33dfc96ff64d5b1e596047c7132a4 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 cee75410127dd1b86c1156563003216d93a086b3 
  clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
677edd385f35d4262342b567262c0b874876d25b 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-05 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 5, 2015, 7:45 p.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; refactored callback handling to prevent unnecessary requests


KAFKA-2168; address review comments


KAFKA-2168; fix rebase error and checkstyle issue


KAFKA-2168; address review comments and add docs


KAFKA-2168; handle polling with timeout 0


KAFKA-2168; timeout=0 means return immediately


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
1ca75f83d3667f7d01da1ae2fd9488fb79562364 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d1d1ec178f60dc47d408f52a89e52886c1a093a2 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
f50da825756938c193d7f07bee953e000e2627d9 
  
clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
56281ee15cc33dfc96ff64d5b1e596047c7132a4 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 cee75410127dd1b86c1156563003216d93a086b3 
  clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
677edd385f35d4262342b567262c0b874876d25b 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-04 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 4, 2015, 9:36 p.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; refactored callback handling to prevent unnecessary requests


KAFKA-2168; address review comments


KAFKA-2168; fix rebase error and checkstyle issue


KAFKA-2168; address review comments and add docs


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
1ca75f83d3667f7d01da1ae2fd9488fb79562364 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d1d1ec178f60dc47d408f52a89e52886c1a093a2 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
f50da825756938c193d7f07bee953e000e2627d9 
  
clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
56281ee15cc33dfc96ff64d5b1e596047c7132a4 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 cee75410127dd1b86c1156563003216d93a086b3 
  clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
677edd385f35d4262342b567262c0b874876d25b 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-04 Thread Jason Gustafson


 On June 4, 2015, 6:21 p.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 671
  https://reviews.apache.org/r/34789/diff/5/?file=978185#file978185line671
 
  It looks like these were already issues before this code was 
  rearranged. Are there JIRAs filed for them yet?

I think they're covered by KAFKA-1894.


 On June 4, 2015, 6:21 p.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1110
  https://reviews.apache.org/r/34789/diff/5/?file=978185#file978185line1110
 
  This is definitely optional, but it might makes sense to just inline 
  this to discourage anyone from using it anywhere else -- this is really the 
  only place it should be needed.

I've removed this since it's not actually needed. If the coordinator is not 
known, the request will fail immediately and ensureCoordinator will then be 
invoked.


 On June 4, 2015, 6:21 p.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 391
  https://reviews.apache.org/r/34789/diff/5/?file=978185#file978185line391
 
  If this is going to be left to be fixed with the patch for KIP-19, we 
  should make sure that's noted in the associated JIRA.

I've added a comment to KAFKA-2120.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review86651
---


On June 4, 2015, 9:36 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 4, 2015, 9:36 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-04 Thread Ewen Cheslack-Postava


 On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1091
  https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line1091
 
  Since poll() can trigger auto offset commits, and then the commits can 
  block while polling() for some time, can we end up recursing in some bad 
  situations, e.g. if we consistently cannot get a coordinator?
  
  We might need to keep track if a commit is outstanding and not try to 
  commit again, or just update the values we're trying to commit.

To clarify the issue: in poll(timeout), we check if it is time to autocommit 
and call commit, which then calls commitOffsets. In commitOffsets, we have a 
while(true) loop and in it we poll for both sync and async. If that polling 
process takes long enough, then we could hit the next interval and those poll() 
calls could trigger another call to commit. Now we have 2 calls to commit on 
the stack.

I don't think this is likely, and I'm not certain there's a condition where you 
can get stuck in the loop that long when using async commits. But since the 
logic in commitOffsets current checks response.isReady first, *then* breaks if 
its async, I thought it might be possible that during a connectivity issue with 
the coordinator, you might just get stuck in this loop even in async commit 
mode and trigger this recursive commit behavior (given a long enough 
outage/short enough auto commit interval).


 On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java,
   line 15
  https://reviews.apache.org/r/34789/diff/3/?file=976967#file976967line15
 
  The classes named XResponse may be a bit confusing because the protocol 
  responses use that terminology. Future? Result?
 
 Jason Gustafson wrote:
 Agreed. In fact, they were XResult initially. I changed them because 
 BrokerResult and CoordinatorResult didn't seems to suggest as clearly what 
 they were for as BrokerResponse and CoordinatorResponse. I considered Future 
 as well, but its usage is a bit different than traditional Java Futures. 
 Perhaps XReply?

Even though there's no blocking get(), XFuture might be the clearest. XReply 
would work, but has a similar issue that it gets confusing whether XResponse or 
XReply is the actual message received back vs. the processed data that you 
wanted to extract.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review86338
---


On June 4, 2015, 4:07 a.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 4, 2015, 4:07 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2186; fix rebase error and checkstyle issue
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  fac79951d50ef6f19cef5fe62cbc4582b27b145a 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  c5e577ff98bea3de65e290d30065935a29b3247f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-04 Thread Jason Gustafson


 On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1091
  https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line1091
 
  Since poll() can trigger auto offset commits, and then the commits can 
  block while polling() for some time, can we end up recursing in some bad 
  situations, e.g. if we consistently cannot get a coordinator?
  
  We might need to keep track if a commit is outstanding and not try to 
  commit again, or just update the values we're trying to commit.
 
 Ewen Cheslack-Postava wrote:
 To clarify the issue: in poll(timeout), we check if it is time to 
 autocommit and call commit, which then calls commitOffsets. In commitOffsets, 
 we have a while(true) loop and in it we poll for both sync and async. If that 
 polling process takes long enough, then we could hit the next interval and 
 those poll() calls could trigger another call to commit. Now we have 2 calls 
 to commit on the stack.
 
 I don't think this is likely, and I'm not certain there's a condition 
 where you can get stuck in the loop that long when using async commits. But 
 since the logic in commitOffsets current checks response.isReady first, 
 *then* breaks if its async, I thought it might be possible that during a 
 connectivity issue with the coordinator, you might just get stuck in this 
 loop even in async commit mode and trigger this recursive commit behavior 
 (given a long enough outage/short enough auto commit interval).

I think there might be some confusion caused by the the overloading of the poll 
method. The poll that is called in commitOffsets is basically just a wrapper 
around client.poll() and doesn't recurse. Perhaps I can rename these methods to 
clientPoll() to make this clearer?


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review86338
---


On June 4, 2015, 4:07 a.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 4, 2015, 4:07 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2186; fix rebase error and checkstyle issue
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  fac79951d50ef6f19cef5fe62cbc4582b27b145a 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  c5e577ff98bea3de65e290d30065935a29b3247f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-04 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review86651
---



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment138705

If this is going to be left to be fixed with the patch for KIP-19, we 
should make sure that's noted in the associated JIRA.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment138706

It looks like these were already issues before this code was rearranged. 
Are there JIRAs filed for them yet?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment138744

javadoc is out of data - strategy is no longer an argument here



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment138743

This is definitely optional, but it might makes sense to just inline this 
to discourage anyone from using it anywhere else -- this is really the only 
place it should be needed.


LGTM once rebased against trunk, only had a few minor comments. The naming of 
the DelayedResult classes could be changed, but I think none of the options we 
have are entirely clear so I wouldn't worry about that naming too much.

- Ewen Cheslack-Postava


On June 4, 2015, 4:07 a.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 4, 2015, 4:07 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2186; fix rebase error and checkstyle issue
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  fac79951d50ef6f19cef5fe62cbc4582b27b145a 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  c5e577ff98bea3de65e290d30065935a29b3247f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-06-04 Thread Ewen Cheslack-Postava


 On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1091
  https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line1091
 
  Since poll() can trigger auto offset commits, and then the commits can 
  block while polling() for some time, can we end up recursing in some bad 
  situations, e.g. if we consistently cannot get a coordinator?
  
  We might need to keep track if a commit is outstanding and not try to 
  commit again, or just update the values we're trying to commit.
 
 Ewen Cheslack-Postava wrote:
 To clarify the issue: in poll(timeout), we check if it is time to 
 autocommit and call commit, which then calls commitOffsets. In commitOffsets, 
 we have a while(true) loop and in it we poll for both sync and async. If that 
 polling process takes long enough, then we could hit the next interval and 
 those poll() calls could trigger another call to commit. Now we have 2 calls 
 to commit on the stack.
 
 I don't think this is likely, and I'm not certain there's a condition 
 where you can get stuck in the loop that long when using async commits. But 
 since the logic in commitOffsets current checks response.isReady first, 
 *then* breaks if its async, I thought it might be possible that during a 
 connectivity issue with the coordinator, you might just get stuck in this 
 loop even in async commit mode and trigger this recursive commit behavior 
 (given a long enough outage/short enough auto commit interval).
 
 Jason Gustafson wrote:
 I think there might be some confusion caused by the the overloading of 
 the poll method. The poll that is called in commitOffsets is basically just a 
 wrapper around client.poll() and doesn't recurse. Perhaps I can rename these 
 methods to clientPoll() to make this clearer?

Ah, yes, that makes sense. I was getting a bit confused when reading some of 
the code about which poll was being invoked. Adjusting the names would probably 
help.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review86338
---


On June 4, 2015, 4:07 a.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 4, 2015, 4:07 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2186; fix rebase error and checkstyle issue
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  fac79951d50ef6f19cef5fe62cbc4582b27b145a 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  c5e577ff98bea3de65e290d30065935a29b3247f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 


Re: Review Request 34789: Patch for KAFKA-2168

2015-06-03 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review86338
---


This looks good so far. I think it's much easier to understand when all the 
blocking stuff happens at the KafkaConsumer level and each of the classes it 
uses only ever handles single requests. It'd be nice to document the basic 
architecture somewhere since it took me a bit to fully figure it out. 
(Unfortunately, since the javadocs for the consumer are in the implementation 
class KafkaConsumer instead of on the Consumer interface, we can't put this 
with the KafkaConsumer class...)

Some notes in addition to the inline stuff:

Some functionality has been pulled back up to KafkaConsumer, in a mild reversal 
of Guozhang's refactoring. It'd be nice to keep this to a minimum. The ones 
that stuck out to me were resetOffsets()/resetOffset()/offsetBefore(). I'm 
guessing you also couldn't figure out a way to keep it in Fetcher since the 
inner call to offsetBefore() requires that blocking loop?

Some handling of DelayedResponses and its subclasses seem redundant/follows a 
common pattern and maybe could be refactored into utility code. However, there 
are few enough places it's happening now that I don't think it's a big deal. It 
does seem a bit wasteful that we have to continually create these 
DelayedResponse objects even in cases where we know we'll fail fast, but I 
suppose those cases should be unusual and the cost to allocate them isn't all 
that high.

Finally, a readability/cleanliness thing. This patch adds more nested anonymous 
RequestCompletionHandler classes. I think these are fine as they are, but if 
the implementations get too long or branchy with all the various error 
conditions they can become unreadably over-indented. Taking some of the big 
ones and using named nested classes might help improve clarity, although it 
does separate the request initiating code from the response handling code.


clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment138296

This seems like it'll be analogous to KIP-19's request.timeout.ms since it 
looks like all the use cases are sending a single request and waiting for a 
single response. I don't think there's any patch ready for that yet (check w/ 
the JIRA and maybe Jiangjie has something that hasn't been submitted yet), but 
if that ends up accepted we could potentially add that flag in either patch.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment138299

Might want to move this to trace. Normal consumers are going to hit this *a 
lot*



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment138304

I think this is simpler and clearer if you reorder these two -- reset the 
offsets that need it first, then update fetch positions for ones that are 
missing it. I think this removes the extra conditional 
!subscriptions.offsetResetNeeded(tp) that was added in updateFetchPositions too.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment138305

Any reason not to do this? Definitely seems like it'd be necessary although 
the logic might be more complicated than just moving maybeHearbeat() in here -- 
some of the other lead up to this could change if you saw a partition 
reassignment in the middle of a long poll, requiring fetch positions to be 
updated, etc.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment138320

Besides grouping by node, we could also send these requests out in 
parallel. The drawback to simplifying this all to use a series off 
offsetBefore() calls is that each blocks, so resetting a bunch of offsets is 
going to be pretty slow.

Obvious solution is a utility that lets you run a bunch of requests in 
parallel, then do the same looping you're doing waiting for a single response 
but handle a bunch all at once.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment138324

Since poll() can trigger auto offset commits, and then the commits can 
block while polling() for some time, can we end up recursing in some bad 
situations, e.g. if we consistently cannot get a coordinator?

We might need to keep track if a commit is outstanding and not try to 
commit again, or just update the values we're trying to commit.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment138323

In async mode, why bother polling at all here? You mentioned coordinator 
connection issues in the comment below, but if we need to 

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-03 Thread Jason Gustafson


 On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
  This looks good so far. I think it's much easier to understand when all the 
  blocking stuff happens at the KafkaConsumer level and each of the classes 
  it uses only ever handles single requests. It'd be nice to document the 
  basic architecture somewhere since it took me a bit to fully figure it out. 
  (Unfortunately, since the javadocs for the consumer are in the 
  implementation class KafkaConsumer instead of on the Consumer interface, we 
  can't put this with the KafkaConsumer class...)
  
  Some notes in addition to the inline stuff:
  
  Some functionality has been pulled back up to KafkaConsumer, in a mild 
  reversal of Guozhang's refactoring. It'd be nice to keep this to a minimum. 
  The ones that stuck out to me were 
  resetOffsets()/resetOffset()/offsetBefore(). I'm guessing you also couldn't 
  figure out a way to keep it in Fetcher since the inner call to 
  offsetBefore() requires that blocking loop?
  
  Some handling of DelayedResponses and its subclasses seem redundant/follows 
  a common pattern and maybe could be refactored into utility code. However, 
  there are few enough places it's happening now that I don't think it's a 
  big deal. It does seem a bit wasteful that we have to continually create 
  these DelayedResponse objects even in cases where we know we'll fail fast, 
  but I suppose those cases should be unusual and the cost to allocate them 
  isn't all that high.
  
  Finally, a readability/cleanliness thing. This patch adds more nested 
  anonymous RequestCompletionHandler classes. I think these are fine as they 
  are, but if the implementations get too long or branchy with all the 
  various error conditions they can become unreadably over-indented. Taking 
  some of the big ones and using named nested classes might help improve 
  clarity, although it does separate the request initiating code from the 
  response handling code.

Yeah, that's right. The offsetBefore method was the tricky one to deal with. I 
tried to keep the lower level logic in Fetcher, but resetOffsets ended up 
leaking into KafkaConsumer. That was before I started using the DelayedResponse 
interface though, so there may be a way to move it back to Fetcher.

We could definitely reduce some of the DelayedResponse object creation using 
several static instances. I think there are only a couple cases where it is 
actually necessary to have a new instance. I also think the usage is currently 
a little nasty since you have to make sure that the delayed response is 
finished through all code paths. That could 

As you mention, it's kind of nice having the handler code right there in the 
method. I actually kind of like the pattern used in some other cases where 
instead of a nested class, an anonymous class is created which simply delegates 
to a handler method.


 On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 683
  https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line683
 
  I think this is simpler and clearer if you reorder these two -- reset 
  the offsets that need it first, then update fetch positions for ones that 
  are missing it. I think this removes the extra conditional 
  !subscriptions.offsetResetNeeded(tp) that was added in updateFetchPositions 
  too.

Yeah, I agree. I'll have to rework the code a little bit since 
updateFetchPositions can lead to an offset reset (in the case that the fetched 
and committed positions are both null), but I can probably handle that case in 
a separate method.


 On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 696
  https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line696
 
  Any reason not to do this? Definitely seems like it'd be necessary 
  although the logic might be more complicated than just moving 
  maybeHearbeat() in here -- some of the other lead up to this could change 
  if you saw a partition reassignment in the middle of a long poll, requiring 
  fetch positions to be updated, etc.

No particular reason. I just hadn't thought it all the way through yet. Perhaps 
to make it easy, we could just exit the poll call if we need a partition 
reassignment or an offset update?


 On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 988
  https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line988
 
  Besides grouping by node, we could also send these requests out in 
  parallel. The drawback to simplifying this all to use a series off 
  offsetBefore() calls is that each blocks, so resetting a bunch of offsets 
  is going to be pretty slow.
  
  Obvious solution is a utility that lets you run a bunch of requests in 
  parallel, then do the same 

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-03 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 4, 2015, 1:21 a.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; refactored callback handling to prevent unnecessary requests


KAFKA-2168; address review comments


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
1ca75f83d3667f7d01da1ae2fd9488fb79562364 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
f50da825756938c193d7f07bee953e000e2627d9 
  
clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 fac79951d50ef6f19cef5fe62cbc4582b27b145a 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
c5e577ff98bea3de65e290d30065935a29b3247f 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 cee75410127dd1b86c1156563003216d93a086b3 
  clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
677edd385f35d4262342b567262c0b874876d25b 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-03 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 4, 2015, 4:07 a.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; refactored callback handling to prevent unnecessary requests


KAFKA-2168; address review comments


KAFKA-2186; fix rebase error and checkstyle issue


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
1ca75f83d3667f7d01da1ae2fd9488fb79562364 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
f50da825756938c193d7f07bee953e000e2627d9 
  
clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 fac79951d50ef6f19cef5fe62cbc4582b27b145a 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
c5e577ff98bea3de65e290d30065935a29b3247f 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 cee75410127dd1b86c1156563003216d93a086b3 
  clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
677edd385f35d4262342b567262c0b874876d25b 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-03 Thread Jason Gustafson


 On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
  This looks good so far. I think it's much easier to understand when all the 
  blocking stuff happens at the KafkaConsumer level and each of the classes 
  it uses only ever handles single requests. It'd be nice to document the 
  basic architecture somewhere since it took me a bit to fully figure it out. 
  (Unfortunately, since the javadocs for the consumer are in the 
  implementation class KafkaConsumer instead of on the Consumer interface, we 
  can't put this with the KafkaConsumer class...)
  
  Some notes in addition to the inline stuff:
  
  Some functionality has been pulled back up to KafkaConsumer, in a mild 
  reversal of Guozhang's refactoring. It'd be nice to keep this to a minimum. 
  The ones that stuck out to me were 
  resetOffsets()/resetOffset()/offsetBefore(). I'm guessing you also couldn't 
  figure out a way to keep it in Fetcher since the inner call to 
  offsetBefore() requires that blocking loop?
  
  Some handling of DelayedResponses and its subclasses seem redundant/follows 
  a common pattern and maybe could be refactored into utility code. However, 
  there are few enough places it's happening now that I don't think it's a 
  big deal. It does seem a bit wasteful that we have to continually create 
  these DelayedResponse objects even in cases where we know we'll fail fast, 
  but I suppose those cases should be unusual and the cost to allocate them 
  isn't all that high.
  
  Finally, a readability/cleanliness thing. This patch adds more nested 
  anonymous RequestCompletionHandler classes. I think these are fine as they 
  are, but if the implementations get too long or branchy with all the 
  various error conditions they can become unreadably over-indented. Taking 
  some of the big ones and using named nested classes might help improve 
  clarity, although it does separate the request initiating code from the 
  response handling code.
 
 Jason Gustafson wrote:
 Yeah, that's right. The offsetBefore method was the tricky one to deal 
 with. I tried to keep the lower level logic in Fetcher, but resetOffsets 
 ended up leaking into KafkaConsumer. That was before I started using the 
 DelayedResponse interface though, so there may be a way to move it back to 
 Fetcher.
 
 We could definitely reduce some of the DelayedResponse object creation 
 using several static instances. I think there are only a couple cases where 
 it is actually necessary to have a new instance. I also think the usage is 
 currently a little nasty since you have to make sure that the delayed 
 response is finished through all code paths. That could 
 
 As you mention, it's kind of nice having the handler code right there in 
 the method. I actually kind of like the pattern used in some other cases 
 where instead of a nested class, an anonymous class is created which simply 
 delegates to a handler method.

Forgot to finish my thought about the nastiness of DelayedResponse usage... The 
issue is having to always ensure that the DelayedResponse is finished. If it 
doesn't get finished, then the consumer ends up retrying the request (after 
timing out). The code might just need to be restructured a little bit so that 
it's clearer that it does get finished in all cases.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review86338
---


On June 3, 2015, 12:10 a.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 3, 2015, 12:10 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  b2764df11afa7a99fce46d1ff48960d889032d14 
   
 

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-02 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 3, 2015, 12:10 a.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; refactored callback handling to prevent unnecessary requests


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
1ca75f83d3667f7d01da1ae2fd9488fb79562364 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
f50da825756938c193d7f07bee953e000e2627d9 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 b2764df11afa7a99fce46d1ff48960d889032d14 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResponse.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResponse.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 cee75410127dd1b86c1156563003216d93a086b3 
  clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
677edd385f35d4262342b567262c0b874876d25b 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 34789: Patch for KAFKA-2168

2015-06-01 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/
---

(Updated June 1, 2015, 11:04 p.m.)


Review request for kafka.


Bugs: KAFKA-2168
https://issues.apache.org/jira/browse/KAFKA-2168


Repository: kafka


Description (updated)
---

KAFKA-2168; move blocking calls into KafkaConsumer to enable async wakeup()


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
f50da825756938c193d7f07bee953e000e2627d9 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 b2764df11afa7a99fce46d1ff48960d889032d14 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetchResult.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 cee75410127dd1b86c1156563003216d93a086b3 
  clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
677edd385f35d4262342b567262c0b874876d25b 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 

Diff: https://reviews.apache.org/r/34789/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 34789: Patch for KAFKA-2168

2015-05-28 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review85644
---


I think close() isn't quite right, and is probably harder than just wakeup(). 
Also, I think there are other cases where NetworkClient.poll() is called in a 
loop that aren't handled, e.g. NetworkCLient.completeAll. I'm not sure these 
can be handled without pushing the closed flag into NetworkClient (maybe 
changing the name to closing to allow some operations to continue normally so 
code using NetworkClient can finish up whatever it was doing).


clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment137308

Missed removing one of the synchronized keywords.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment137313

I think this requires a bit more coordination between the threads. As 
written, won't this wakeup the selector, but this thread could continue running 
and close metrics/client/serializers before the other thread is done with them?

This gets confusing if we need to support both close() from the poll() 
thread and from another I guess -- in one case you need to wait for another 
thread to finish, in the other you can proceed immediately.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
https://reviews.apache.org/r/34789/#comment137314

Why do these all have ensureNotClosed(), but the KafkaConsumer methods 
don't all have it?


- Ewen Cheslack-Postava


On May 28, 2015, 10:58 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated May 28, 2015, 10:58 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; Remove synchronization of KafkaConsumer to enable non-blocking 
 close
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ClosedConsumerException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  b2764df11afa7a99fce46d1ff48960d889032d14 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 34789: Patch for KAFKA-2168

2015-05-28 Thread Jason Gustafson


 On May 28, 2015, 11:30 p.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 857
  https://reviews.apache.org/r/34789/diff/1/?file=973873#file973873line857
 
  I think this requires a bit more coordination between the threads. As 
  written, won't this wakeup the selector, but this thread could continue 
  running and close metrics/client/serializers before the other thread is 
  done with them?
  
  This gets confusing if we need to support both close() from the poll() 
  thread and from another I guess -- in one case you need to wait for another 
  thread to finish, in the other you can proceed immediately.

Good point. Makes me think that we should bring the syncronization back for 
everything except close. Then we could write close like this:

```
if (client != null) client.wakeup();
synchronized (this) {
  // Shutdown client, serializers, etc.
}
```


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review85644
---


On May 28, 2015, 10:58 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated May 28, 2015, 10:58 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; Remove synchronization of KafkaConsumer to enable non-blocking 
 close
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ClosedConsumerException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  b2764df11afa7a99fce46d1ff48960d889032d14 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson