Re: Review Request 34492: Patch for KAFKA-2210

2015-06-09 Thread Dapeng Sun

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34492/#review86541
---


Hi Parth, thank you for your patch of authorization, and could you use add 
comments for replying Jun's comments? it will be easy to let the other person 
track the comments.


core/src/main/scala/kafka/common/ErrorMapping.scala
https://reviews.apache.org/r/34492/#comment139453

Why not use 23 here?



core/src/main/scala/kafka/security/auth/Authorizer.scala
https://reviews.apache.org/r/34492/#comment139464

Resource related operations are authorized, but authorizer itself seems not 
be authorized. Could a normal user  operated the ACL? we may need to add 
something (eg. Session) to addACL, removeACL and etc.



core/src/main/scala/kafka/security/auth/Authorizer.scala
https://reviews.apache.org/r/34492/#comment139458

I didn't found any code invoke removeAcls, it seems Acl entities are not 
cleaned when resource(eg. topic) is deleting, is that really so? or I missed?



core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala
https://reviews.apache.org/r/34492/#comment139452

I think it is better to use constants or enum to stand for UserType


Thanks
Dapeng

- Dapeng Sun


On 六月 5, 2015, 7:07 a.m., Parth Brahmbhatt wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34492/
 ---
 
 (Updated 六月 5, 2015, 7:07 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2210
 https://issues.apache.org/jira/browse/KAFKA-2210
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressing review comments from Jun.
 
 
 Adding CREATE check for offset topic only if the topic does not exist already.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/api/OffsetRequest.scala 
 3d483bc7518ad76f9548772522751afb4d046b78 
   core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION 
   core/src/main/scala/kafka/common/ErrorMapping.scala 
 c75c68589681b2c9d6eba2b440ce5e58cddf6370 
   core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/Operation.java PRE-CREATION 
   core/src/main/scala/kafka/security/auth/PermissionType.java PRE-CREATION 
   core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/ResourceType.java PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 387e387998fc3a6c9cb585dab02b5f77b0381fbf 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 6f25afd0e5df98258640252661dee271b1795111 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 e66710d2368334ece66f70d55f57b3f888262620 
   core/src/test/resources/acl.json PRE-CREATION 
   core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
 71f48c07723e334e6489efab500a43fa93a52d0c 
 
 Diff: https://reviews.apache.org/r/34492/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Parth Brahmbhatt
 




Re: [DISCUSS] New consumer offset commit API

2015-06-09 Thread Joel Koshy
Ewen,

Sorry for the late comment, but while we are discussing this API I
think we should also consider the addition of metadata to the offset
commit. This is supported on the broker-side but there is no means in
the current API to include metadata or retention time in the offset
commit.  Ideally, this would also mean changing the committed API to
return the offset + metadata. I realize this is orthogonal to what you
are trying to achieve, so I'm okay with breaking this out - at the
same time I would prefer additional churn to the API.

Thanks,

Joel

On Mon, Apr 27, 2015 at 06:03:05PM -0700, Guozhang Wang wrote:
 Thanks Ewen,
 
 1. I agree with you as for the pros of Future; my argument for pure
 Callback, as I mentioned, is that it sounds to me unlikely users would
 usually want to explicitly set timeout. That said, if people think this
 scenario is actually common like in graceful shutdown, then I am OK with
 Future against just the callback. My concern against Future is that after
 thinking through the code myself I feel the implementation would be quite
 complicated.
 
 2. As for infinite retries, for now we have three coordinator-blocking
 call, one for join-group, one for sync-commit-offsets, one for
 fetch-offsets; I think for these three it is appropriate to use blocking
 calls, since the consumer cannot proceed anyways without getting the
 response from these calls according to their semantics.
 
 3. With the priority mechanism I think we can then get rid of the muting
 mechanism. Not sure if it is possible to implement in
 java.nio.channels.Selector?
 
 Guozhang
 
 
 
 On Thu, Apr 23, 2015 at 2:25 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
 
  Thanks, great feedback everyone.
 
  Jiangjie -- I was worried about interleaving as well. For commits using the
  consumer's own current set of offsets, I agree we could easily limit to 1
  outstanding request so the older one gets cancelled. For commits that
  specify offsets manually, we might not need to do anything special, just
  note in the docs that bad things can happen if you submit two separate
  offset commit requests (i.e. don't wait for callbacks) and have retries
  enabled. Alternatively, we could just serialize them, i.e. always have at
  most 1 outstanding and retries always take priority.
 
  Guozhang -- That breakdown of use cases looks right to me. I agree that I
  don't think users of this API would be trying to use the futures for
  promise pipelining or anything, the only use is to provide allow them to
  block on the operation. That said, I think there are some tradeoffs to
  consider between the two options:
 
  Pros of Future/Cons of only having callback:
  * Gives control over timeouts. With only callbacks users have to manage
  this themselves if they care about timing. I think there's at least one
  very common case for this: graceful shutdown where I want to try to commit
  offsets, but after some time shutdown whether successful or not.
  * Consistency. This matches the Producer interface and Futures are a
  standard pattern.
 
  Pros of only callback/cons of Future
  * Know up front if its sync/async. This might simplify the implementation,
  as Guozhang points out. (However, it also means the patch needs to add a
  timeout mechanism, which doesn't exist yet. That's probably not a huge
  patch, but non-trivial. Maybe it needs to be added regardless.)
 
  Regardless of the interface we settle on, I'd argue that we should get rid
  of the infinite retry version, at least limiting it to a max # of retries,
  each of which are bound by a timeout. It's really problematic having it run
  indefinitely long since it locks up the consumer, which means you can't,
  e.g., shut down properly. More generally, I think anytime we we have an API
  where a TimeoutException is *not* a possibility, we're almost definitely
  trying to hide the network from the user in a way that makes it difficult
  for them to write applications that behave correctly.
 
  On the muting implementation, I'm not sure I'm convinced it's *required* to
  mute the others entirely. Couldn't Selector.poll have a mechanism for
  prioritizing reads rather than completely muting the other nodes? For
  example, what if poll() did one select for just the key we're currently
  keeping unmuted with the timeout, then if there are no keys ready to read,
  reregister interest and select(0) to only get the ones that are immediately
  ready.
 
  Jay -- the polling thing isn't an issue if you just poll from within the
  Future. That's how the WIP patch works. My only concern with that is that
  it's unintuitive because most future implementations are just waiting for
  another thread to finish some operation; the only potentially bad side
  affect I could think of is that user callbacks might then run in the thread
  calling Future.get(), which might be unexpected if they think they are
  doing all the work of polling in a different thread.
 
  -Ewen
 
 
  On Wed, Apr 22, 

Re: [DISCUSS] New consumer offset commit API

2015-06-09 Thread Joel Koshy
 same time I would prefer additional churn to the API.

I meant to say prefer _less_ churn to the API.

On Tue, Jun 09, 2015 at 08:35:35AM -0700, Joel Koshy wrote:
 Ewen,
 
 Sorry for the late comment, but while we are discussing this API I
 think we should also consider the addition of metadata to the offset
 commit. This is supported on the broker-side but there is no means in
 the current API to include metadata or retention time in the offset
 commit.  Ideally, this would also mean changing the committed API to
 return the offset + metadata. I realize this is orthogonal to what you
 are trying to achieve, so I'm okay with breaking this out - at the
 same time I would prefer additional churn to the API.
 
 Thanks,
 
 Joel
 
 On Mon, Apr 27, 2015 at 06:03:05PM -0700, Guozhang Wang wrote:
  Thanks Ewen,
  
  1. I agree with you as for the pros of Future; my argument for pure
  Callback, as I mentioned, is that it sounds to me unlikely users would
  usually want to explicitly set timeout. That said, if people think this
  scenario is actually common like in graceful shutdown, then I am OK with
  Future against just the callback. My concern against Future is that after
  thinking through the code myself I feel the implementation would be quite
  complicated.
  
  2. As for infinite retries, for now we have three coordinator-blocking
  call, one for join-group, one for sync-commit-offsets, one for
  fetch-offsets; I think for these three it is appropriate to use blocking
  calls, since the consumer cannot proceed anyways without getting the
  response from these calls according to their semantics.
  
  3. With the priority mechanism I think we can then get rid of the muting
  mechanism. Not sure if it is possible to implement in
  java.nio.channels.Selector?
  
  Guozhang
  
  
  
  On Thu, Apr 23, 2015 at 2:25 PM, Ewen Cheslack-Postava e...@confluent.io
  wrote:
  
   Thanks, great feedback everyone.
  
   Jiangjie -- I was worried about interleaving as well. For commits using 
   the
   consumer's own current set of offsets, I agree we could easily limit to 1
   outstanding request so the older one gets cancelled. For commits that
   specify offsets manually, we might not need to do anything special, just
   note in the docs that bad things can happen if you submit two separate
   offset commit requests (i.e. don't wait for callbacks) and have retries
   enabled. Alternatively, we could just serialize them, i.e. always have at
   most 1 outstanding and retries always take priority.
  
   Guozhang -- That breakdown of use cases looks right to me. I agree that I
   don't think users of this API would be trying to use the futures for
   promise pipelining or anything, the only use is to provide allow them to
   block on the operation. That said, I think there are some tradeoffs to
   consider between the two options:
  
   Pros of Future/Cons of only having callback:
   * Gives control over timeouts. With only callbacks users have to manage
   this themselves if they care about timing. I think there's at least one
   very common case for this: graceful shutdown where I want to try to commit
   offsets, but after some time shutdown whether successful or not.
   * Consistency. This matches the Producer interface and Futures are a
   standard pattern.
  
   Pros of only callback/cons of Future
   * Know up front if its sync/async. This might simplify the implementation,
   as Guozhang points out. (However, it also means the patch needs to add a
   timeout mechanism, which doesn't exist yet. That's probably not a huge
   patch, but non-trivial. Maybe it needs to be added regardless.)
  
   Regardless of the interface we settle on, I'd argue that we should get rid
   of the infinite retry version, at least limiting it to a max # of retries,
   each of which are bound by a timeout. It's really problematic having it 
   run
   indefinitely long since it locks up the consumer, which means you can't,
   e.g., shut down properly. More generally, I think anytime we we have an 
   API
   where a TimeoutException is *not* a possibility, we're almost definitely
   trying to hide the network from the user in a way that makes it difficult
   for them to write applications that behave correctly.
  
   On the muting implementation, I'm not sure I'm convinced it's *required* 
   to
   mute the others entirely. Couldn't Selector.poll have a mechanism for
   prioritizing reads rather than completely muting the other nodes? For
   example, what if poll() did one select for just the key we're currently
   keeping unmuted with the timeout, then if there are no keys ready to read,
   reregister interest and select(0) to only get the ones that are 
   immediately
   ready.
  
   Jay -- the polling thing isn't an issue if you just poll from within the
   Future. That's how the WIP patch works. My only concern with that is that
   it's unintuitive because most future implementations are just waiting for
   another thread to finish some 

Kafka KIP hangout June 9

2015-06-09 Thread Jun Rao
Hi, Everyone,

We will have a KIP hangout at 11 PST on June 9. The following is the
agenda. If you want to attend and is not on the invite, please let me know.

Agenda:
KIP-12 (sasl/ssl authentication): status check
KIP-4: keeping ISR in TopicMetadataResponse
KIP-25: (system test)

Thanks,

Jun


Re: Review Request 33378: Patch for KAFKA-2136

2015-06-09 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33378/
---

(Updated June 9, 2015, 5:07 p.m.)


Review request for kafka, Joel Koshy and Jun Rao.


Bugs: KAFKA-2136
https://issues.apache.org/jira/browse/KAFKA-2136


Repository: kafka


Description (updated)
---

Addressing Joel's comments


Merging


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
56281ee15cc33dfc96ff64d5b1e596047c7132a4 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
07e65d4a54ba4eef5b787eba3e71cbe9f6a920bd 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
3dc8b015afd2347a41c9a9dbc02b8e367da5f75f 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
8686d83aa52e435c6adafbe9ff4bd1602281072a 
  clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
eb8951fba48c335095cc43fc3672de1c733e07ff 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
fabeae3083a8ea55cdacbb9568f3847ccd85bab4 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
37ec0b79beafcf5735c386b066eb319fb697eff5 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
e3cc1967e407b64cc734548c19e30de700b64ba8 
  core/src/main/scala/kafka/api/FetchRequest.scala 
5b38f8554898e54800abd65a7415dd0ac41fd958 
  core/src/main/scala/kafka/api/FetchResponse.scala 
0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
c866180d3680da03e48d374415f10220f6ca68c4 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
c16f7edd322709060e54c77eb505c44cbd77a4ec 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
83fc47417dd7fe4edf030217fa7fd69d99b170b0 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
de6cf5bdaa0e70394162febc63b50b55ca0a92db 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
05078b24ef28f2f4e099afa943e43f1d00359fda 
  core/src/main/scala/kafka/server/KafkaApis.scala 
d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
  core/src/main/scala/kafka/server/OffsetManager.scala 
5cca85cf727975f6d3acb2223fd186753ad761dc 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b31b432a226ba79546dd22ef1d2acbb439c2e9a3 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
5717165f2344823fabe8f7cfafae4bb8af2d949a 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
00d59337a99ac135e8689bd1ecd928f7b1423d79 

Diff: https://reviews.apache.org/r/33378/diff/


Testing
---

New tests added


Thanks,

Aditya Auradkar



[jira] [Commented] (KAFKA-2136) Client side protocol changes to return quota delays

2015-06-09 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579233#comment-14579233
 ] 

Aditya A Auradkar commented on KAFKA-2136:
--

Updated reviewboard https://reviews.apache.org/r/33378/diff/
 against branch origin/trunk

 Client side protocol changes to return quota delays
 ---

 Key: KAFKA-2136
 URL: https://issues.apache.org/jira/browse/KAFKA-2136
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch, 
 KAFKA-2136_2015-05-06_18:35:54.patch, KAFKA-2136_2015-05-11_14:50:56.patch, 
 KAFKA-2136_2015-05-12_14:40:44.patch, KAFKA-2136_2015-06-09_10:07:13.patch


 As described in KIP-13, evolve the protocol to return a throttle_time_ms in 
 the Fetch and the ProduceResponse objects. Add client side metrics on the new 
 producer and consumer to expose the delay time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2136) Client side protocol changes to return quota delays

2015-06-09 Thread Aditya A Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya A Auradkar updated KAFKA-2136:
-
Attachment: KAFKA-2136_2015-06-09_10:07:13.patch

 Client side protocol changes to return quota delays
 ---

 Key: KAFKA-2136
 URL: https://issues.apache.org/jira/browse/KAFKA-2136
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch, 
 KAFKA-2136_2015-05-06_18:35:54.patch, KAFKA-2136_2015-05-11_14:50:56.patch, 
 KAFKA-2136_2015-05-12_14:40:44.patch, KAFKA-2136_2015-06-09_10:07:13.patch


 As described in KIP-13, evolve the protocol to return a throttle_time_ms in 
 the Fetch and the ProduceResponse objects. Add client side metrics on the new 
 producer and consumer to expose the delay time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33378: Patch for KAFKA-2136

2015-06-09 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33378/
---

(Updated June 9, 2015, 5:08 p.m.)


Review request for kafka, Joel Koshy and Jun Rao.


Bugs: KAFKA-2136
https://issues.apache.org/jira/browse/KAFKA-2136


Repository: kafka


Description (updated)
---

Changes are
- protocol changes to the fetch request and response to return the 
throttle_time_ms to clients
- New producer/consumer metrics to expose the avg and max delay time for a 
client
- Test cases.
- Addressed Joel's comments

For now the patch will publish a zero delay and return a response


Diffs
-

  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
56281ee15cc33dfc96ff64d5b1e596047c7132a4 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
07e65d4a54ba4eef5b787eba3e71cbe9f6a920bd 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
3dc8b015afd2347a41c9a9dbc02b8e367da5f75f 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
8686d83aa52e435c6adafbe9ff4bd1602281072a 
  clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
eb8951fba48c335095cc43fc3672de1c733e07ff 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
fabeae3083a8ea55cdacbb9568f3847ccd85bab4 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
37ec0b79beafcf5735c386b066eb319fb697eff5 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
e3cc1967e407b64cc734548c19e30de700b64ba8 
  core/src/main/scala/kafka/api/FetchRequest.scala 
5b38f8554898e54800abd65a7415dd0ac41fd958 
  core/src/main/scala/kafka/api/FetchResponse.scala 
0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
c866180d3680da03e48d374415f10220f6ca68c4 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
c16f7edd322709060e54c77eb505c44cbd77a4ec 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
83fc47417dd7fe4edf030217fa7fd69d99b170b0 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
de6cf5bdaa0e70394162febc63b50b55ca0a92db 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
05078b24ef28f2f4e099afa943e43f1d00359fda 
  core/src/main/scala/kafka/server/KafkaApis.scala 
d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
  core/src/main/scala/kafka/server/OffsetManager.scala 
5cca85cf727975f6d3acb2223fd186753ad761dc 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b31b432a226ba79546dd22ef1d2acbb439c2e9a3 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
5717165f2344823fabe8f7cfafae4bb8af2d949a 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
00d59337a99ac135e8689bd1ecd928f7b1423d79 

Diff: https://reviews.apache.org/r/33378/diff/


Testing
---

New tests added


Thanks,

Aditya Auradkar



[jira] [Commented] (KAFKA-2136) Client side protocol changes to return quota delays

2015-06-09 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579241#comment-14579241
 ] 

Aditya A Auradkar commented on KAFKA-2136:
--

Updated reviewboard https://reviews.apache.org/r/33378/diff/
 against branch origin/trunk

 Client side protocol changes to return quota delays
 ---

 Key: KAFKA-2136
 URL: https://issues.apache.org/jira/browse/KAFKA-2136
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch, 
 KAFKA-2136_2015-05-06_18:35:54.patch, KAFKA-2136_2015-05-11_14:50:56.patch, 
 KAFKA-2136_2015-05-12_14:40:44.patch, KAFKA-2136_2015-06-09_10:07:13.patch, 
 KAFKA-2136_2015-06-09_10:10:25.patch


 As described in KIP-13, evolve the protocol to return a throttle_time_ms in 
 the Fetch and the ProduceResponse objects. Add client side metrics on the new 
 producer and consumer to expose the delay time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33378: Patch for KAFKA-2136

2015-06-09 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33378/
---

(Updated June 9, 2015, 5:10 p.m.)


Review request for kafka, Joel Koshy and Jun Rao.


Bugs: KAFKA-2136
https://issues.apache.org/jira/browse/KAFKA-2136


Repository: kafka


Description (updated)
---

Addressing Joel's comments


Merging


Chaning variable name


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
56281ee15cc33dfc96ff64d5b1e596047c7132a4 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
07e65d4a54ba4eef5b787eba3e71cbe9f6a920bd 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
3dc8b015afd2347a41c9a9dbc02b8e367da5f75f 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
8686d83aa52e435c6adafbe9ff4bd1602281072a 
  clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
eb8951fba48c335095cc43fc3672de1c733e07ff 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
fabeae3083a8ea55cdacbb9568f3847ccd85bab4 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
37ec0b79beafcf5735c386b066eb319fb697eff5 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
e3cc1967e407b64cc734548c19e30de700b64ba8 
  core/src/main/scala/kafka/api/FetchRequest.scala 
5b38f8554898e54800abd65a7415dd0ac41fd958 
  core/src/main/scala/kafka/api/FetchResponse.scala 
0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
c866180d3680da03e48d374415f10220f6ca68c4 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
c16f7edd322709060e54c77eb505c44cbd77a4ec 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
83fc47417dd7fe4edf030217fa7fd69d99b170b0 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
de6cf5bdaa0e70394162febc63b50b55ca0a92db 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
05078b24ef28f2f4e099afa943e43f1d00359fda 
  core/src/main/scala/kafka/server/KafkaApis.scala 
d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
  core/src/main/scala/kafka/server/OffsetManager.scala 
5cca85cf727975f6d3acb2223fd186753ad761dc 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b31b432a226ba79546dd22ef1d2acbb439c2e9a3 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
5717165f2344823fabe8f7cfafae4bb8af2d949a 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
00d59337a99ac135e8689bd1ecd928f7b1423d79 

Diff: https://reviews.apache.org/r/33378/diff/


Testing
---

New tests added


Thanks,

Aditya Auradkar



[jira] [Updated] (KAFKA-2136) Client side protocol changes to return quota delays

2015-06-09 Thread Aditya A Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya A Auradkar updated KAFKA-2136:
-
Attachment: KAFKA-2136_2015-06-09_10:10:25.patch

 Client side protocol changes to return quota delays
 ---

 Key: KAFKA-2136
 URL: https://issues.apache.org/jira/browse/KAFKA-2136
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch, 
 KAFKA-2136_2015-05-06_18:35:54.patch, KAFKA-2136_2015-05-11_14:50:56.patch, 
 KAFKA-2136_2015-05-12_14:40:44.patch, KAFKA-2136_2015-06-09_10:07:13.patch, 
 KAFKA-2136_2015-06-09_10:10:25.patch


 As described in KIP-13, evolve the protocol to return a throttle_time_ms in 
 the Fetch and the ProduceResponse objects. Add client side metrics on the new 
 producer and consumer to expose the delay time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33378: Patch for KAFKA-2136

2015-06-09 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33378/
---

(Updated June 9, 2015, 5:10 p.m.)


Review request for kafka, Joel Koshy and Jun Rao.


Bugs: KAFKA-2136
https://issues.apache.org/jira/browse/KAFKA-2136


Repository: kafka


Description (updated)
---

Changes are
- protocol changes to the fetch request and response to return the 
throttle_time_ms to clients
- New producer/consumer metrics to expose the avg and max delay time for a 
client
- Test cases.
- Addressed Joel's comments

For now the patch will publish a zero delay and return a response


Diffs
-

  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
56281ee15cc33dfc96ff64d5b1e596047c7132a4 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
07e65d4a54ba4eef5b787eba3e71cbe9f6a920bd 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
3dc8b015afd2347a41c9a9dbc02b8e367da5f75f 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
8686d83aa52e435c6adafbe9ff4bd1602281072a 
  clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
eb8951fba48c335095cc43fc3672de1c733e07ff 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
fabeae3083a8ea55cdacbb9568f3847ccd85bab4 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
37ec0b79beafcf5735c386b066eb319fb697eff5 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
e3cc1967e407b64cc734548c19e30de700b64ba8 
  core/src/main/scala/kafka/api/FetchRequest.scala 
5b38f8554898e54800abd65a7415dd0ac41fd958 
  core/src/main/scala/kafka/api/FetchResponse.scala 
0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
c866180d3680da03e48d374415f10220f6ca68c4 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
c16f7edd322709060e54c77eb505c44cbd77a4ec 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
83fc47417dd7fe4edf030217fa7fd69d99b170b0 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
de6cf5bdaa0e70394162febc63b50b55ca0a92db 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
05078b24ef28f2f4e099afa943e43f1d00359fda 
  core/src/main/scala/kafka/server/KafkaApis.scala 
d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
  core/src/main/scala/kafka/server/OffsetManager.scala 
5cca85cf727975f6d3acb2223fd186753ad761dc 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b31b432a226ba79546dd22ef1d2acbb439c2e9a3 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
5717165f2344823fabe8f7cfafae4bb8af2d949a 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
00d59337a99ac135e8689bd1ecd928f7b1423d79 

Diff: https://reviews.apache.org/r/33378/diff/


Testing
---

New tests added


Thanks,

Aditya Auradkar



KIP Call Notes

2015-06-09 Thread Gwen Shapira
Shortest KIP discussion ever :)

Here are my notes. Feel free to correct or expand:

SSL: Harsh is rebasing after Kafka-1928, and resolving an issue with
support for specific API

KIP-4  - ISR in Metadata request: We prefer to preserve this information.

KIP-25 - System Tests: Geoffrey posted an update with discussion
summary, there were no objections to the proposal. We will wait a day
before starting a vote.

New consumer - Separate the API changes for metadata to a new JIRA.
Since the new consumer is still work in progress, there's no need for
KIP for every modification.


Re: Review Request 33378: Patch for KAFKA-2136

2015-06-09 Thread Aditya Auradkar


 On June 5, 2015, 2:43 a.m., Joel Koshy wrote:
  Overall, looks good.
  
  General comment on the naming: delay vs throttle. Personally, I prefer 
  throttle - I think that is clearer from the client perspective.
  
  Should probably add a test to verify that the replica fetchers are never 
  throttled. Or this may be more relevant for your other patch.

I've changed the names to throttle everywhere. I'll add a replica fetcher test 
to my other patch


- Aditya


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33378/#review86428
---


On June 9, 2015, 5:10 p.m., Aditya Auradkar wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33378/
 ---
 
 (Updated June 9, 2015, 5:10 p.m.)
 
 
 Review request for kafka, Joel Koshy and Jun Rao.
 
 
 Bugs: KAFKA-2136
 https://issues.apache.org/jira/browse/KAFKA-2136
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Changes are
 - protocol changes to the fetch request and response to return the 
 throttle_time_ms to clients
 - New producer/consumer metrics to expose the avg and max delay time for a 
 client
 - Test cases.
 - Addressed Joel's comments
 
 For now the patch will publish a zero delay and return a response
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 07e65d4a54ba4eef5b787eba3e71cbe9f6a920bd 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f 
   clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
 8686d83aa52e435c6adafbe9ff4bd1602281072a 
   clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
 eb8951fba48c335095cc43fc3672de1c733e07ff 
   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
 fabeae3083a8ea55cdacbb9568f3847ccd85bab4 
   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
 37ec0b79beafcf5735c386b066eb319fb697eff5 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  e3cc1967e407b64cc734548c19e30de700b64ba8 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 5b38f8554898e54800abd65a7415dd0ac41fd958 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 c866180d3680da03e48d374415f10220f6ca68c4 
   core/src/main/scala/kafka/api/ProducerResponse.scala 
 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 
   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
 c16f7edd322709060e54c77eb505c44cbd77a4ec 
   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 de6cf5bdaa0e70394162febc63b50b55ca0a92db 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 05078b24ef28f2f4e099afa943e43f1d00359fda 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 5cca85cf727975f6d3acb2223fd186753ad761dc 
   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
 b31b432a226ba79546dd22ef1d2acbb439c2e9a3 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 5717165f2344823fabe8f7cfafae4bb8af2d949a 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 00d59337a99ac135e8689bd1ecd928f7b1423d79 
 
 Diff: https://reviews.apache.org/r/33378/diff/
 
 
 Testing
 ---
 
 New tests added
 
 
 Thanks,
 
 Aditya Auradkar
 




[jira] [Commented] (KAFKA-2232) make MockProducer generic

2015-06-09 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579324#comment-14579324
 ] 

Jun Rao commented on KAFKA-2232:


[~apakulov], we haven't officially moved to submitting patches through git pull 
requests. Could you attach the patch to the jira (perhaps using the 
patch-review tool)? Thanks,

 make MockProducer generic
 -

 Key: KAFKA-2232
 URL: https://issues.apache.org/jira/browse/KAFKA-2232
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.2.1
Reporter: Jun Rao
  Labels: newbie
 Fix For: 0.8.3


 Currently, MockProducer implements Producerbyte[], byte[]. Instead, we 
 should implement MockProducerK, V.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 34789: Patch for KAFKA-2168

2015-06-09 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87257
---



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139570

Do we need this? There is no real guarantee on the poll time, so it seems 
that we could just return when wakeup is called.


- Jay Kreps


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 5, 2015, 7:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




[jira] [Updated] (KAFKA-2232) make MockProducer generic

2015-06-09 Thread Alexander Pakulov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Pakulov updated KAFKA-2232:
-
Attachment: KAFKA-2232.patch

 make MockProducer generic
 -

 Key: KAFKA-2232
 URL: https://issues.apache.org/jira/browse/KAFKA-2232
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.2.1
Reporter: Jun Rao
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-2232.patch


 Currently, MockProducer implements Producerbyte[], byte[]. Instead, we 
 should implement MockProducerK, V.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 35261: Patch for KAFKA-2232

2015-06-09 Thread Alexander Pakulov

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/35261/
---

Review request for kafka.


Bugs: KAFKA-2232
https://issues.apache.org/jira/browse/KAFKA-2232


Repository: kafka


Description
---

KAFKA-2232: Make MockProducer generic


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
f50da825756938c193d7f07bee953e000e2627d9 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
e66491cc82f11641df6516e7d7abb4a808c27368 
  clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
677edd385f35d4262342b567262c0b874876d25b 
  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
6372f1a7f7f77d96ba7be05eb927c004f7fefb73 
  clients/src/test/java/org/apache/kafka/test/MockSerializer.java 
e75d2e4e58ae0cdbe276d3a3b652e47795984791 

Diff: https://reviews.apache.org/r/35261/diff/


Testing
---


Thanks,

Alexander Pakulov



[jira] [Commented] (KAFKA-2232) make MockProducer generic

2015-06-09 Thread Alexander Pakulov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579408#comment-14579408
 ] 

Alexander Pakulov commented on KAFKA-2232:
--

Created reviewboard https://reviews.apache.org/r/35261/diff/
 against branch origin/trunk

 make MockProducer generic
 -

 Key: KAFKA-2232
 URL: https://issues.apache.org/jira/browse/KAFKA-2232
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.2.1
Reporter: Jun Rao
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-2232.patch


 Currently, MockProducer implements Producerbyte[], byte[]. Instead, we 
 should implement MockProducerK, V.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 34789: Patch for KAFKA-2168

2015-06-09 Thread Jason Gustafson


 On June 9, 2015, 6:49 p.m., Jay Kreps wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 322
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line322
 
  Do we need this? There is no real guarantee on the poll time, so it 
  seems that we could just return when wakeup is called.

You might be waking up from a synchronous commit, for example. In that case, 
all we can do is raise an exception. We could alternatively say that wakeup 
only applies to the poll() method and cannot be used to interrupt the other 
calls.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87257
---


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34789/
 ---
 
 (Updated June 5, 2015, 7:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2168
 https://issues.apache.org/jira/browse/KAFKA-2168
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2168; refactored callback handling to prevent unnecessary requests
 
 
 KAFKA-2168; address review comments
 
 
 KAFKA-2168; fix rebase error and checkstyle issue
 
 
 KAFKA-2168; address review comments and add docs
 
 
 KAFKA-2168; handle polling with timeout 0
 
 
 KAFKA-2168; timeout=0 means return immediately
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 d1d1ec178f60dc47d408f52a89e52886c1a093a2 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 f50da825756938c193d7f07bee953e000e2627d9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  cee75410127dd1b86c1156563003216d93a086b3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
 677edd385f35d4262342b567262c0b874876d25b 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
 
 Diff: https://reviews.apache.org/r/34789/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




[Discussion] New Consumer API / Protocol

2015-06-09 Thread Guozhang Wang
This email is to kick-off some discussion around the changes we want to
make on the new consumer APIs as well as their semantics. Here are a
not-comprehensive list of items in my mind:

1. Poll(timeout): current definition of timeout states The time, in
milliseconds, spent waiting in poll if data is not available. If 0, waits
indefinitely. While in the current implementation, we have different
semantics as stated, for example:

a) poll(timeout) can return before timeout elapsed with empty consumed
data.
b) poll(timeout) can return after more than timeout elapsed due to
blocking event like join-group, coordinator discovery, etc.

We should think a bit more on what semantics we really want to provide and
how to provide it in implementation.

2. Thread safeness: currently we have a coarsen-grained locking mechanism
that provides thread safeness but blocks commit / position / etc calls
while poll() is in process. We are considering to remove the
coarsen-grained locking with an additional Consumer.wakeup() call to break
the polling, and instead suggest users to have one consumer client per
thread, which aligns with the design of a single-threaded consumer
(KAFKA-2123).

3. Commit(): we want to improve the async commit calls to add a callback
handler upon commit completes, and guarantee ordering of commit calls with
retry policies (KAFKA-2168). In addition, we want to extend the API to
expose attaching / fetching offset metadata stored in the Kafka offset
manager.

4. OffsetFetchRequest: currently for handling OffsetCommitRequest we check
the generation id and the assigned partitions before accepting the request
if the group is using Kafka for partition management, but for
OffsetFetchRequest we cannot do this checking since it does not include
groupId / consumerId information. Do people think this is OK or we should
add this as we did in OffsetCommitRequest?

5. New APIs: there are some other requests to add:

a) offsetsBeforeTime(timestamp): or would seekToEnd and seekToBeginning
sufficient?

b) listTopics(): or should we just enforce users to use AdminUtils for such
operations?

There may be other issues that I have missed here, so folks just bring it
up if you thought about anything else.

-- Guozhang


Re: Review Request 34789: Patch for KAFKA-2168

2015-06-09 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
---


Thanks for the patch. A few comments below.


clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139565

Could we add an example of how to use the new wakeup() call, especially 
with closing the consumer properly? For example, does the consumer thread just 
catch the ConsumerWakeupException and then call close()?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139574

Hmm, seekToBegining() is supposed to be a blocking call. Basically, at the 
end of the call, we expect the fetch offset to be set to the beginning. This is 
now changed to async, which doesn't match the intended behavior. We need to 
think through if this matters or not.

Ditto for seekToEnd().



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139577

Should we pass in tp to isOffsetResetNeeded()?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139509

The returned response may be ready already after the offsetBefore call due 
to needing metadata refresh. Since we don't check the ready state immediately 
afterward, we may be delaying the processing of metadata refresh by the request 
timeout.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139564

Currently, our coding convention is not to wrap single line statement with 
{}. There are a few other cases like this.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139544

In the async mode, response may not be ready in the first iteration. Are we 
handling the retry properly in that case?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139545

We probably need to make some changes here when KAFKA-2120 is done to 
handle the request timeout propertly. Perhaps we can add a TODO comment here.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/34789/#comment139563

-1 makes the pollClient block forever. So, we don't get a chance to do the 
wakeup check.



clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
https://reviews.apache.org/r/34789/#comment139588

Do we need NONE?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
https://reviews.apache.org/r/34789/#comment139579

We probably can make it clear that this is a non-blocking call and doesn't 
wait for the request to be sent or the response to be received. It would be 
good to do that on other similar methods too.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
https://reviews.apache.org/r/34789/#comment139537

Should asynchronous by synchronous?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
https://reviews.apache.org/r/34789/#comment139586

Similar as the above, it would be useful to make it clear that 
fetchOffsets() is non-blocking.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
https://reviews.apache.org/r/34789/#comment139534

Perhaps we can define a static method to initalize the constant and set the 
state. It's clearer that way since the instantiation and the initialization are 
in the same place. With this, we probably don't need the static getter methods 
and can just let the caller use the static constants directly.

Ditto for BrokerResult.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
https://reviews.apache.org/r/34789/#comment139540

The comment seems inaccurate. We are not returning an error, but returning 
a remedy instead.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
https://reviews.apache.org/r/34789/#comment139504

Do we need both hasRemedy and hasException? It seems that if one returns 
true, the other should always return false.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
https://reviews.apache.org/r/34789/#comment139578

To be consistent with the naming convention with the rest of the methods, 
should we just name it offsetRestNeeded()?


- Jun Rao


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 

Re: [Discussion] New Consumer API / Protocol

2015-06-09 Thread Jiangjie Qin
Thanks for kicking off this discussion, Guozhang.
We might also want to discuss the API to expose the high watermark. Some
discussion has been there in KAFKA-2076.

Thanks,

Jiangjie (Becket) Qin

On 6/9/15, 1:12 PM, Guozhang Wang wangg...@gmail.com wrote:

This email is to kick-off some discussion around the changes we want to
make on the new consumer APIs as well as their semantics. Here are a
not-comprehensive list of items in my mind:

1. Poll(timeout): current definition of timeout states The time, in
milliseconds, spent waiting in poll if data is not available. If 0, waits
indefinitely. While in the current implementation, we have different
semantics as stated, for example:

a) poll(timeout) can return before timeout elapsed with empty consumed
data.
b) poll(timeout) can return after more than timeout elapsed due to
blocking event like join-group, coordinator discovery, etc.

We should think a bit more on what semantics we really want to provide and
how to provide it in implementation.

2. Thread safeness: currently we have a coarsen-grained locking mechanism
that provides thread safeness but blocks commit / position / etc calls
while poll() is in process. We are considering to remove the
coarsen-grained locking with an additional Consumer.wakeup() call to break
the polling, and instead suggest users to have one consumer client per
thread, which aligns with the design of a single-threaded consumer
(KAFKA-2123).

3. Commit(): we want to improve the async commit calls to add a callback
handler upon commit completes, and guarantee ordering of commit calls with
retry policies (KAFKA-2168). In addition, we want to extend the API to
expose attaching / fetching offset metadata stored in the Kafka offset
manager.

4. OffsetFetchRequest: currently for handling OffsetCommitRequest we check
the generation id and the assigned partitions before accepting the request
if the group is using Kafka for partition management, but for
OffsetFetchRequest we cannot do this checking since it does not include
groupId / consumerId information. Do people think this is OK or we should
add this as we did in OffsetCommitRequest?

5. New APIs: there are some other requests to add:

a) offsetsBeforeTime(timestamp): or would seekToEnd and seekToBeginning
sufficient?

b) listTopics(): or should we just enforce users to use AdminUtils for
such
operations?

There may be other issues that I have missed here, so folks just bring it
up if you thought about anything else.

-- Guozhang



Re: [Discussion] New Consumer API / Protocol

2015-06-09 Thread Jason Gustafson
Just a minor correction, but #2 is KAFKA-2168 and #3 is KAFKA-2123.

For #1, I think there should be some minimal effort to making the poll
respect the timeout (with the understanding that spurious wakeups can
happen). I think this really just means calling NetworkClient.poll() in a
loop and checking if there are any results to return. An additional
complication is that long poll timeouts can prevent heartbeats or
auto-committed offsets from being sent. This means that the maximum time
that the consumer should actually block in NetworkClient.poll() should be
the minimum of the poll timeout, the heartbeat interval, and the
auto-commit interval (if it's set). I attempted to handle this in my patch
for KAFKA-2168, which is pending review.

I tend to agree with Jay on 1b though. Trying to leave the JoinGroup
pending could get tricky.


On Tue, Jun 9, 2015 at 4:25 PM, Jay Kreps jay.kr...@gmail.com wrote:

 My two cents:

 Overall I think our focus as far as extensions go should be on (1) things
 which would be incompatible if changed later and (2) operationalization.
 There are lots of new apis that could be nice to have, but I think if
 adding them later will just be an api addition we should hold off and get
 out what we have, if it will be ugly to add later then let's try to get it
 in now.

 1a. I don't think there is too much value in attempting to avoid spurious
 wakeups. Several people have asked about this but I think the only usage
 that makes sense is in an event loop--since you can always get 0 events due
 to a timeout. I think we should just document that the timeout is just
 there for guidance and not strictly enforced. I think this is intuitive
 since no timeout like this is ever really strictly enforced (if there is a
 30 second gc pause we will be off by 30 seconds regardless of how diligent
 in our own code)
 1b. Same here. I think we should just document this. Trying to return in
 the middle of a joinGroup will be very complex so I think we should just
 document this.
 Also: The docs currently say that a timeout of 0 blocks forever but I think
 that might be wrong. Regardless I think for consistency a timeout of 0
 should be non-blocking and a timeout of MAX_INT should be used to block
 forever.

 2. Here is my understanding of this one. I think on this there was
 originally some interest in tightening up the locking to allow more
 parallelism in consumer usage. Personally I think this adds a lot of
 complexity and would prefer to avoid it. My concern is primarily around
 implementation complexity--I think without a very careful, well abstracted
 threading model a big chunk of code with locks of adhoc locking, even if
 perfect when initially written is just very hard to maintain. I also think
 the single threaded usage pattern is also easier for the end-user and
 likely faster, though there are some downsides. After some discussion I
 think there was a second proposal to instead leave the locking as is and
 add a wakeup call. The use case for this would be something like quickly
 interrupting and shutting down a thread that is in it's event loop. I think
 this makes sense though we could probably release a v1 without it if need
 be. I think an open question here is whether interrupt needs to work for
 all calls (e.g. commit) or just poll, with other calls having a defacto
 timeout from the request timeout and retry count. I would vote for the
 later if it helps with implementation simplicity.

 3. I think both these extensions make sense. It would be nice to get them
 into the first release to avoid api churn.

 4. Not sure I fully understand.

 5a. The rationale for doing seekToBeginning and seekToEnd was that we felt
 that we might need to think a little bit about the offset list api a bit
 more before knowing how best to expose it. We clearly need something here
 but the hope was the two seekTo calls were good enough to get started and
 we could wait to think it out properly to add the more general call. I
 think the thinking was basically (a) the time mechanism for the
 intermediate segments is often wrong, (b) the protocol that does the fetch
 is quite ad hoc and perhaps there is more per-segment info that should be
 in that request, and (c) the api in simple consumer is very unintuitive. If
 we are adding an end-state solution

 5b. Not sure but can we add it later? How would mm work?


 On Tue, Jun 9, 2015 at 1:12 PM, Guozhang Wang wangg...@gmail.com wrote:

  This email is to kick-off some discussion around the changes we want to
  make on the new consumer APIs as well as their semantics. Here are a
  not-comprehensive list of items in my mind:
 
  1. Poll(timeout): current definition of timeout states The time, in
  milliseconds, spent waiting in poll if data is not available. If 0, waits
  indefinitely. While in the current implementation, we have different
  semantics as stated, for example:
 
  a) poll(timeout) can return before timeout elapsed with empty consumed
  data.
  b) 

Re: [Discussion] New Consumer API / Protocol

2015-06-09 Thread Jay Kreps
My two cents:

Overall I think our focus as far as extensions go should be on (1) things
which would be incompatible if changed later and (2) operationalization.
There are lots of new apis that could be nice to have, but I think if
adding them later will just be an api addition we should hold off and get
out what we have, if it will be ugly to add later then let's try to get it
in now.

1a. I don't think there is too much value in attempting to avoid spurious
wakeups. Several people have asked about this but I think the only usage
that makes sense is in an event loop--since you can always get 0 events due
to a timeout. I think we should just document that the timeout is just
there for guidance and not strictly enforced. I think this is intuitive
since no timeout like this is ever really strictly enforced (if there is a
30 second gc pause we will be off by 30 seconds regardless of how diligent
in our own code)
1b. Same here. I think we should just document this. Trying to return in
the middle of a joinGroup will be very complex so I think we should just
document this.
Also: The docs currently say that a timeout of 0 blocks forever but I think
that might be wrong. Regardless I think for consistency a timeout of 0
should be non-blocking and a timeout of MAX_INT should be used to block
forever.

2. Here is my understanding of this one. I think on this there was
originally some interest in tightening up the locking to allow more
parallelism in consumer usage. Personally I think this adds a lot of
complexity and would prefer to avoid it. My concern is primarily around
implementation complexity--I think without a very careful, well abstracted
threading model a big chunk of code with locks of adhoc locking, even if
perfect when initially written is just very hard to maintain. I also think
the single threaded usage pattern is also easier for the end-user and
likely faster, though there are some downsides. After some discussion I
think there was a second proposal to instead leave the locking as is and
add a wakeup call. The use case for this would be something like quickly
interrupting and shutting down a thread that is in it's event loop. I think
this makes sense though we could probably release a v1 without it if need
be. I think an open question here is whether interrupt needs to work for
all calls (e.g. commit) or just poll, with other calls having a defacto
timeout from the request timeout and retry count. I would vote for the
later if it helps with implementation simplicity.

3. I think both these extensions make sense. It would be nice to get them
into the first release to avoid api churn.

4. Not sure I fully understand.

5a. The rationale for doing seekToBeginning and seekToEnd was that we felt
that we might need to think a little bit about the offset list api a bit
more before knowing how best to expose it. We clearly need something here
but the hope was the two seekTo calls were good enough to get started and
we could wait to think it out properly to add the more general call. I
think the thinking was basically (a) the time mechanism for the
intermediate segments is often wrong, (b) the protocol that does the fetch
is quite ad hoc and perhaps there is more per-segment info that should be
in that request, and (c) the api in simple consumer is very unintuitive. If
we are adding an end-state solution

5b. Not sure but can we add it later? How would mm work?


On Tue, Jun 9, 2015 at 1:12 PM, Guozhang Wang wangg...@gmail.com wrote:

 This email is to kick-off some discussion around the changes we want to
 make on the new consumer APIs as well as their semantics. Here are a
 not-comprehensive list of items in my mind:

 1. Poll(timeout): current definition of timeout states The time, in
 milliseconds, spent waiting in poll if data is not available. If 0, waits
 indefinitely. While in the current implementation, we have different
 semantics as stated, for example:

 a) poll(timeout) can return before timeout elapsed with empty consumed
 data.
 b) poll(timeout) can return after more than timeout elapsed due to
 blocking event like join-group, coordinator discovery, etc.

 We should think a bit more on what semantics we really want to provide and
 how to provide it in implementation.

 2. Thread safeness: currently we have a coarsen-grained locking mechanism
 that provides thread safeness but blocks commit / position / etc calls
 while poll() is in process. We are considering to remove the
 coarsen-grained locking with an additional Consumer.wakeup() call to break
 the polling, and instead suggest users to have one consumer client per
 thread, which aligns with the design of a single-threaded consumer
 (KAFKA-2123).

 3. Commit(): we want to improve the async commit calls to add a callback
 handler upon commit completes, and guarantee ordering of commit calls with
 retry policies (KAFKA-2168). In addition, we want to extend the API to
 expose attaching / fetching offset metadata stored in the Kafka offset
 

Re: Review Request 34789: Patch for KAFKA-2168

2015-06-09 Thread Jason Gustafson


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 995
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line995
 
  Should we pass in tp to isOffsetResetNeeded()?

Yes, we should. I'll fix it.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 797-798
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797
 
  Hmm, seekToBegining() is supposed to be a blocking call. Basically, at 
  the end of the call, we expect the fetch offset to be set to the beginning. 
  This is now changed to async, which doesn't match the intended behavior. We 
  need to think through if this matters or not.
  
  Ditto for seekToEnd().

Since we always update fetch positions before a new fetch and in position(), it 
didn't seem necessary to make it synchronous. I thought this handling might be 
more consistent with how new subscriptions are handled (which are asynchronous 
and defer the initial offset fetch until the next poll or position). That being 
said, I don't have a strong feeling about it, so we could return to the 
blocking version.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 1039-1040
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1039
 
  The returned response may be ready already after the offsetBefore call 
  due to needing metadata refresh. Since we don't check the ready state 
  immediately afterward, we may be delaying the processing of metadata 
  refresh by the request timeout.

This is a pretty good point. One of the reasons working with NetworkClient is 
tricky is that you need several polls to complete a request: one to connect, 
one to send, and one to receive. In this case, the result might not be ready 
because we are in the middle of connecting to the broker, in which case we need 
to call poll() to finish the connect. If we don't, then then next request will 
just fail for the same reason. I'll look to see if there's a way to fix this to 
avoid unnecessary calls to poll.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 1139-1141
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1139
 
  In the async mode, response may not be ready in the first iteration. 
  Are we handling the retry properly in that case?

When an async commit request fails, we do not retry, which is consistent with 
the current consumer. I think Ewen's patch for KAFKA-2123 introduces a good 
approach to retrying async commits. My own preference is to let them fail fast 
as long as the user has the callback from KAFKA-2123 to handle their failure. 
Otherwise, it gets a little tricky trying to preserve their order.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1195
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1195
 
  We probably need to make some changes here when KAFKA-2120 is done to 
  handle the request timeout propertly. Perhaps we can add a TODO comment 
  here.

I'll add a note. This also falls in the purview of KAFKA-1894.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java,
   line 16
  https://reviews.apache.org/r/34789/diff/8/?file=980077#file980077line16
 
  Do we need NONE?

It was there before, but I don't think it's actually used. I'd be fine removing 
it.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java,
   lines 20-27
  https://reviews.apache.org/r/34789/diff/8/?file=980080#file980080line20
 
  Perhaps we can define a static method to initalize the constant and set 
  the state. It's clearer that way since the instantiation and the 
  initialization are in the same place. With this, we probably don't need the 
  static getter methods and can just let the caller use the static constants 
  directly.
  
  Ditto for BrokerResult.

Agreed. I'll fix it.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 1212
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212
 
  -1 makes the pollClient block forever. So, we don't get a chance to do 
  the wakeup check.

I might be wrong, but I think we can still use NetworkClient.wakeup to 
interrupt a poll call which is waiting forever.


 On June 9, 2015, 7:58 p.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  lines 1078-1080
  https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1078
 
  

Query on kafka broker and topic metadata

2015-06-09 Thread Pavan Chenduluru
Hi,

I am new to kafka and I have a doubt.

How to read specified broker and topic statistics from kafka server?

I want to read below parameters about existing topic from kafka.

1) How many activeMessages
2) How many activeSubscriptions
3) How many totalMessages
4) How many totalSubscriptions
5) How mnay deliveryFaults
6) How many pendingDelivery

Pls do the needful.

Thanks  Regards,
Pavan