Re: Review Request 34492: Patch for KAFKA-2210
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/#review86541 --- Hi Parth, thank you for your patch of authorization, and could you use add comments for replying Jun's comments? it will be easy to let the other person track the comments. core/src/main/scala/kafka/common/ErrorMapping.scala https://reviews.apache.org/r/34492/#comment139453 Why not use 23 here? core/src/main/scala/kafka/security/auth/Authorizer.scala https://reviews.apache.org/r/34492/#comment139464 Resource related operations are authorized, but authorizer itself seems not be authorized. Could a normal user operated the ACL? we may need to add something (eg. Session) to addACL, removeACL and etc. core/src/main/scala/kafka/security/auth/Authorizer.scala https://reviews.apache.org/r/34492/#comment139458 I didn't found any code invoke removeAcls, it seems Acl entities are not cleaned when resource(eg. topic) is deleting, is that really so? or I missed? core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala https://reviews.apache.org/r/34492/#comment139452 I think it is better to use constants or enum to stand for UserType Thanks Dapeng - Dapeng Sun On 六月 5, 2015, 7:07 a.m., Parth Brahmbhatt wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- (Updated 六月 5, 2015, 7:07 a.m.) Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description --- Addressing review comments from Jun. Adding CREATE check for offset topic only if the topic does not exist already. Diffs - core/src/main/scala/kafka/api/OffsetRequest.scala 3d483bc7518ad76f9548772522751afb4d046b78 core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.java PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.java PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.java PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 387e387998fc3a6c9cb585dab02b5f77b0381fbf core/src/main/scala/kafka/server/KafkaConfig.scala 6f25afd0e5df98258640252661dee271b1795111 core/src/main/scala/kafka/server/KafkaServer.scala e66710d2368334ece66f70d55f57b3f888262620 core/src/test/resources/acl.json PRE-CREATION core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 71f48c07723e334e6489efab500a43fa93a52d0c Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
Re: [DISCUSS] New consumer offset commit API
Ewen, Sorry for the late comment, but while we are discussing this API I think we should also consider the addition of metadata to the offset commit. This is supported on the broker-side but there is no means in the current API to include metadata or retention time in the offset commit. Ideally, this would also mean changing the committed API to return the offset + metadata. I realize this is orthogonal to what you are trying to achieve, so I'm okay with breaking this out - at the same time I would prefer additional churn to the API. Thanks, Joel On Mon, Apr 27, 2015 at 06:03:05PM -0700, Guozhang Wang wrote: Thanks Ewen, 1. I agree with you as for the pros of Future; my argument for pure Callback, as I mentioned, is that it sounds to me unlikely users would usually want to explicitly set timeout. That said, if people think this scenario is actually common like in graceful shutdown, then I am OK with Future against just the callback. My concern against Future is that after thinking through the code myself I feel the implementation would be quite complicated. 2. As for infinite retries, for now we have three coordinator-blocking call, one for join-group, one for sync-commit-offsets, one for fetch-offsets; I think for these three it is appropriate to use blocking calls, since the consumer cannot proceed anyways without getting the response from these calls according to their semantics. 3. With the priority mechanism I think we can then get rid of the muting mechanism. Not sure if it is possible to implement in java.nio.channels.Selector? Guozhang On Thu, Apr 23, 2015 at 2:25 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Thanks, great feedback everyone. Jiangjie -- I was worried about interleaving as well. For commits using the consumer's own current set of offsets, I agree we could easily limit to 1 outstanding request so the older one gets cancelled. For commits that specify offsets manually, we might not need to do anything special, just note in the docs that bad things can happen if you submit two separate offset commit requests (i.e. don't wait for callbacks) and have retries enabled. Alternatively, we could just serialize them, i.e. always have at most 1 outstanding and retries always take priority. Guozhang -- That breakdown of use cases looks right to me. I agree that I don't think users of this API would be trying to use the futures for promise pipelining or anything, the only use is to provide allow them to block on the operation. That said, I think there are some tradeoffs to consider between the two options: Pros of Future/Cons of only having callback: * Gives control over timeouts. With only callbacks users have to manage this themselves if they care about timing. I think there's at least one very common case for this: graceful shutdown where I want to try to commit offsets, but after some time shutdown whether successful or not. * Consistency. This matches the Producer interface and Futures are a standard pattern. Pros of only callback/cons of Future * Know up front if its sync/async. This might simplify the implementation, as Guozhang points out. (However, it also means the patch needs to add a timeout mechanism, which doesn't exist yet. That's probably not a huge patch, but non-trivial. Maybe it needs to be added regardless.) Regardless of the interface we settle on, I'd argue that we should get rid of the infinite retry version, at least limiting it to a max # of retries, each of which are bound by a timeout. It's really problematic having it run indefinitely long since it locks up the consumer, which means you can't, e.g., shut down properly. More generally, I think anytime we we have an API where a TimeoutException is *not* a possibility, we're almost definitely trying to hide the network from the user in a way that makes it difficult for them to write applications that behave correctly. On the muting implementation, I'm not sure I'm convinced it's *required* to mute the others entirely. Couldn't Selector.poll have a mechanism for prioritizing reads rather than completely muting the other nodes? For example, what if poll() did one select for just the key we're currently keeping unmuted with the timeout, then if there are no keys ready to read, reregister interest and select(0) to only get the ones that are immediately ready. Jay -- the polling thing isn't an issue if you just poll from within the Future. That's how the WIP patch works. My only concern with that is that it's unintuitive because most future implementations are just waiting for another thread to finish some operation; the only potentially bad side affect I could think of is that user callbacks might then run in the thread calling Future.get(), which might be unexpected if they think they are doing all the work of polling in a different thread. -Ewen On Wed, Apr 22,
Re: [DISCUSS] New consumer offset commit API
same time I would prefer additional churn to the API. I meant to say prefer _less_ churn to the API. On Tue, Jun 09, 2015 at 08:35:35AM -0700, Joel Koshy wrote: Ewen, Sorry for the late comment, but while we are discussing this API I think we should also consider the addition of metadata to the offset commit. This is supported on the broker-side but there is no means in the current API to include metadata or retention time in the offset commit. Ideally, this would also mean changing the committed API to return the offset + metadata. I realize this is orthogonal to what you are trying to achieve, so I'm okay with breaking this out - at the same time I would prefer additional churn to the API. Thanks, Joel On Mon, Apr 27, 2015 at 06:03:05PM -0700, Guozhang Wang wrote: Thanks Ewen, 1. I agree with you as for the pros of Future; my argument for pure Callback, as I mentioned, is that it sounds to me unlikely users would usually want to explicitly set timeout. That said, if people think this scenario is actually common like in graceful shutdown, then I am OK with Future against just the callback. My concern against Future is that after thinking through the code myself I feel the implementation would be quite complicated. 2. As for infinite retries, for now we have three coordinator-blocking call, one for join-group, one for sync-commit-offsets, one for fetch-offsets; I think for these three it is appropriate to use blocking calls, since the consumer cannot proceed anyways without getting the response from these calls according to their semantics. 3. With the priority mechanism I think we can then get rid of the muting mechanism. Not sure if it is possible to implement in java.nio.channels.Selector? Guozhang On Thu, Apr 23, 2015 at 2:25 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Thanks, great feedback everyone. Jiangjie -- I was worried about interleaving as well. For commits using the consumer's own current set of offsets, I agree we could easily limit to 1 outstanding request so the older one gets cancelled. For commits that specify offsets manually, we might not need to do anything special, just note in the docs that bad things can happen if you submit two separate offset commit requests (i.e. don't wait for callbacks) and have retries enabled. Alternatively, we could just serialize them, i.e. always have at most 1 outstanding and retries always take priority. Guozhang -- That breakdown of use cases looks right to me. I agree that I don't think users of this API would be trying to use the futures for promise pipelining or anything, the only use is to provide allow them to block on the operation. That said, I think there are some tradeoffs to consider between the two options: Pros of Future/Cons of only having callback: * Gives control over timeouts. With only callbacks users have to manage this themselves if they care about timing. I think there's at least one very common case for this: graceful shutdown where I want to try to commit offsets, but after some time shutdown whether successful or not. * Consistency. This matches the Producer interface and Futures are a standard pattern. Pros of only callback/cons of Future * Know up front if its sync/async. This might simplify the implementation, as Guozhang points out. (However, it also means the patch needs to add a timeout mechanism, which doesn't exist yet. That's probably not a huge patch, but non-trivial. Maybe it needs to be added regardless.) Regardless of the interface we settle on, I'd argue that we should get rid of the infinite retry version, at least limiting it to a max # of retries, each of which are bound by a timeout. It's really problematic having it run indefinitely long since it locks up the consumer, which means you can't, e.g., shut down properly. More generally, I think anytime we we have an API where a TimeoutException is *not* a possibility, we're almost definitely trying to hide the network from the user in a way that makes it difficult for them to write applications that behave correctly. On the muting implementation, I'm not sure I'm convinced it's *required* to mute the others entirely. Couldn't Selector.poll have a mechanism for prioritizing reads rather than completely muting the other nodes? For example, what if poll() did one select for just the key we're currently keeping unmuted with the timeout, then if there are no keys ready to read, reregister interest and select(0) to only get the ones that are immediately ready. Jay -- the polling thing isn't an issue if you just poll from within the Future. That's how the WIP patch works. My only concern with that is that it's unintuitive because most future implementations are just waiting for another thread to finish some
Kafka KIP hangout June 9
Hi, Everyone, We will have a KIP hangout at 11 PST on June 9. The following is the agenda. If you want to attend and is not on the invite, please let me know. Agenda: KIP-12 (sasl/ssl authentication): status check KIP-4: keeping ISR in TopicMetadataResponse KIP-25: (system test) Thanks, Jun
Re: Review Request 33378: Patch for KAFKA-2136
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/ --- (Updated June 9, 2015, 5:07 p.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2136 https://issues.apache.org/jira/browse/KAFKA-2136 Repository: kafka Description (updated) --- Addressing Joel's comments Merging Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 07e65d4a54ba4eef5b787eba3e71cbe9f6a920bd clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 core/src/main/scala/kafka/api/FetchRequest.scala 5b38f8554898e54800abd65a7415dd0ac41fd958 core/src/main/scala/kafka/api/FetchResponse.scala 0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215 core/src/main/scala/kafka/api/ProducerRequest.scala c866180d3680da03e48d374415f10220f6ca68c4 core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b31b432a226ba79546dd22ef1d2acbb439c2e9a3 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 5717165f2344823fabe8f7cfafae4bb8af2d949a core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 Diff: https://reviews.apache.org/r/33378/diff/ Testing --- New tests added Thanks, Aditya Auradkar
[jira] [Commented] (KAFKA-2136) Client side protocol changes to return quota delays
[ https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579233#comment-14579233 ] Aditya A Auradkar commented on KAFKA-2136: -- Updated reviewboard https://reviews.apache.org/r/33378/diff/ against branch origin/trunk Client side protocol changes to return quota delays --- Key: KAFKA-2136 URL: https://issues.apache.org/jira/browse/KAFKA-2136 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch, KAFKA-2136_2015-05-06_18:35:54.patch, KAFKA-2136_2015-05-11_14:50:56.patch, KAFKA-2136_2015-05-12_14:40:44.patch, KAFKA-2136_2015-06-09_10:07:13.patch As described in KIP-13, evolve the protocol to return a throttle_time_ms in the Fetch and the ProduceResponse objects. Add client side metrics on the new producer and consumer to expose the delay time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2136) Client side protocol changes to return quota delays
[ https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya A Auradkar updated KAFKA-2136: - Attachment: KAFKA-2136_2015-06-09_10:07:13.patch Client side protocol changes to return quota delays --- Key: KAFKA-2136 URL: https://issues.apache.org/jira/browse/KAFKA-2136 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch, KAFKA-2136_2015-05-06_18:35:54.patch, KAFKA-2136_2015-05-11_14:50:56.patch, KAFKA-2136_2015-05-12_14:40:44.patch, KAFKA-2136_2015-06-09_10:07:13.patch As described in KIP-13, evolve the protocol to return a throttle_time_ms in the Fetch and the ProduceResponse objects. Add client side metrics on the new producer and consumer to expose the delay time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33378: Patch for KAFKA-2136
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/ --- (Updated June 9, 2015, 5:08 p.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2136 https://issues.apache.org/jira/browse/KAFKA-2136 Repository: kafka Description (updated) --- Changes are - protocol changes to the fetch request and response to return the throttle_time_ms to clients - New producer/consumer metrics to expose the avg and max delay time for a client - Test cases. - Addressed Joel's comments For now the patch will publish a zero delay and return a response Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 07e65d4a54ba4eef5b787eba3e71cbe9f6a920bd clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 core/src/main/scala/kafka/api/FetchRequest.scala 5b38f8554898e54800abd65a7415dd0ac41fd958 core/src/main/scala/kafka/api/FetchResponse.scala 0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215 core/src/main/scala/kafka/api/ProducerRequest.scala c866180d3680da03e48d374415f10220f6ca68c4 core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b31b432a226ba79546dd22ef1d2acbb439c2e9a3 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 5717165f2344823fabe8f7cfafae4bb8af2d949a core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 Diff: https://reviews.apache.org/r/33378/diff/ Testing --- New tests added Thanks, Aditya Auradkar
[jira] [Commented] (KAFKA-2136) Client side protocol changes to return quota delays
[ https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579241#comment-14579241 ] Aditya A Auradkar commented on KAFKA-2136: -- Updated reviewboard https://reviews.apache.org/r/33378/diff/ against branch origin/trunk Client side protocol changes to return quota delays --- Key: KAFKA-2136 URL: https://issues.apache.org/jira/browse/KAFKA-2136 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch, KAFKA-2136_2015-05-06_18:35:54.patch, KAFKA-2136_2015-05-11_14:50:56.patch, KAFKA-2136_2015-05-12_14:40:44.patch, KAFKA-2136_2015-06-09_10:07:13.patch, KAFKA-2136_2015-06-09_10:10:25.patch As described in KIP-13, evolve the protocol to return a throttle_time_ms in the Fetch and the ProduceResponse objects. Add client side metrics on the new producer and consumer to expose the delay time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33378: Patch for KAFKA-2136
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/ --- (Updated June 9, 2015, 5:10 p.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2136 https://issues.apache.org/jira/browse/KAFKA-2136 Repository: kafka Description (updated) --- Addressing Joel's comments Merging Chaning variable name Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 07e65d4a54ba4eef5b787eba3e71cbe9f6a920bd clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 core/src/main/scala/kafka/api/FetchRequest.scala 5b38f8554898e54800abd65a7415dd0ac41fd958 core/src/main/scala/kafka/api/FetchResponse.scala 0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215 core/src/main/scala/kafka/api/ProducerRequest.scala c866180d3680da03e48d374415f10220f6ca68c4 core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b31b432a226ba79546dd22ef1d2acbb439c2e9a3 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 5717165f2344823fabe8f7cfafae4bb8af2d949a core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 Diff: https://reviews.apache.org/r/33378/diff/ Testing --- New tests added Thanks, Aditya Auradkar
[jira] [Updated] (KAFKA-2136) Client side protocol changes to return quota delays
[ https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya A Auradkar updated KAFKA-2136: - Attachment: KAFKA-2136_2015-06-09_10:10:25.patch Client side protocol changes to return quota delays --- Key: KAFKA-2136 URL: https://issues.apache.org/jira/browse/KAFKA-2136 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch, KAFKA-2136_2015-05-06_18:35:54.patch, KAFKA-2136_2015-05-11_14:50:56.patch, KAFKA-2136_2015-05-12_14:40:44.patch, KAFKA-2136_2015-06-09_10:07:13.patch, KAFKA-2136_2015-06-09_10:10:25.patch As described in KIP-13, evolve the protocol to return a throttle_time_ms in the Fetch and the ProduceResponse objects. Add client side metrics on the new producer and consumer to expose the delay time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33378: Patch for KAFKA-2136
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/ --- (Updated June 9, 2015, 5:10 p.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2136 https://issues.apache.org/jira/browse/KAFKA-2136 Repository: kafka Description (updated) --- Changes are - protocol changes to the fetch request and response to return the throttle_time_ms to clients - New producer/consumer metrics to expose the avg and max delay time for a client - Test cases. - Addressed Joel's comments For now the patch will publish a zero delay and return a response Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 07e65d4a54ba4eef5b787eba3e71cbe9f6a920bd clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 core/src/main/scala/kafka/api/FetchRequest.scala 5b38f8554898e54800abd65a7415dd0ac41fd958 core/src/main/scala/kafka/api/FetchResponse.scala 0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215 core/src/main/scala/kafka/api/ProducerRequest.scala c866180d3680da03e48d374415f10220f6ca68c4 core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b31b432a226ba79546dd22ef1d2acbb439c2e9a3 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 5717165f2344823fabe8f7cfafae4bb8af2d949a core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 Diff: https://reviews.apache.org/r/33378/diff/ Testing --- New tests added Thanks, Aditya Auradkar
KIP Call Notes
Shortest KIP discussion ever :) Here are my notes. Feel free to correct or expand: SSL: Harsh is rebasing after Kafka-1928, and resolving an issue with support for specific API KIP-4 - ISR in Metadata request: We prefer to preserve this information. KIP-25 - System Tests: Geoffrey posted an update with discussion summary, there were no objections to the proposal. We will wait a day before starting a vote. New consumer - Separate the API changes for metadata to a new JIRA. Since the new consumer is still work in progress, there's no need for KIP for every modification.
Re: Review Request 33378: Patch for KAFKA-2136
On June 5, 2015, 2:43 a.m., Joel Koshy wrote: Overall, looks good. General comment on the naming: delay vs throttle. Personally, I prefer throttle - I think that is clearer from the client perspective. Should probably add a test to verify that the replica fetchers are never throttled. Or this may be more relevant for your other patch. I've changed the names to throttle everywhere. I'll add a replica fetcher test to my other patch - Aditya --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/#review86428 --- On June 9, 2015, 5:10 p.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/ --- (Updated June 9, 2015, 5:10 p.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2136 https://issues.apache.org/jira/browse/KAFKA-2136 Repository: kafka Description --- Changes are - protocol changes to the fetch request and response to return the throttle_time_ms to clients - New producer/consumer metrics to expose the avg and max delay time for a client - Test cases. - Addressed Joel's comments For now the patch will publish a zero delay and return a response Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 07e65d4a54ba4eef5b787eba3e71cbe9f6a920bd clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 core/src/main/scala/kafka/api/FetchRequest.scala 5b38f8554898e54800abd65a7415dd0ac41fd958 core/src/main/scala/kafka/api/FetchResponse.scala 0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215 core/src/main/scala/kafka/api/ProducerRequest.scala c866180d3680da03e48d374415f10220f6ca68c4 core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b31b432a226ba79546dd22ef1d2acbb439c2e9a3 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 5717165f2344823fabe8f7cfafae4bb8af2d949a core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 Diff: https://reviews.apache.org/r/33378/diff/ Testing --- New tests added Thanks, Aditya Auradkar
[jira] [Commented] (KAFKA-2232) make MockProducer generic
[ https://issues.apache.org/jira/browse/KAFKA-2232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579324#comment-14579324 ] Jun Rao commented on KAFKA-2232: [~apakulov], we haven't officially moved to submitting patches through git pull requests. Could you attach the patch to the jira (perhaps using the patch-review tool)? Thanks, make MockProducer generic - Key: KAFKA-2232 URL: https://issues.apache.org/jira/browse/KAFKA-2232 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2.1 Reporter: Jun Rao Labels: newbie Fix For: 0.8.3 Currently, MockProducer implements Producerbyte[], byte[]. Instead, we should implement MockProducerK, V. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87257 --- clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139570 Do we need this? There is no real guarantee on the poll time, so it seems that we could just return when wakeup is called. - Jay Kreps On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 5, 2015, 7:45 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
[jira] [Updated] (KAFKA-2232) make MockProducer generic
[ https://issues.apache.org/jira/browse/KAFKA-2232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Pakulov updated KAFKA-2232: - Attachment: KAFKA-2232.patch make MockProducer generic - Key: KAFKA-2232 URL: https://issues.apache.org/jira/browse/KAFKA-2232 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2.1 Reporter: Jun Rao Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-2232.patch Currently, MockProducer implements Producerbyte[], byte[]. Instead, we should implement MockProducerK, V. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 35261: Patch for KAFKA-2232
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35261/ --- Review request for kafka. Bugs: KAFKA-2232 https://issues.apache.org/jira/browse/KAFKA-2232 Repository: kafka Description --- KAFKA-2232: Make MockProducer generic Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java e66491cc82f11641df6516e7d7abb4a808c27368 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 6372f1a7f7f77d96ba7be05eb927c004f7fefb73 clients/src/test/java/org/apache/kafka/test/MockSerializer.java e75d2e4e58ae0cdbe276d3a3b652e47795984791 Diff: https://reviews.apache.org/r/35261/diff/ Testing --- Thanks, Alexander Pakulov
[jira] [Commented] (KAFKA-2232) make MockProducer generic
[ https://issues.apache.org/jira/browse/KAFKA-2232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579408#comment-14579408 ] Alexander Pakulov commented on KAFKA-2232: -- Created reviewboard https://reviews.apache.org/r/35261/diff/ against branch origin/trunk make MockProducer generic - Key: KAFKA-2232 URL: https://issues.apache.org/jira/browse/KAFKA-2232 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2.1 Reporter: Jun Rao Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-2232.patch Currently, MockProducer implements Producerbyte[], byte[]. Instead, we should implement MockProducerK, V. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34789: Patch for KAFKA-2168
On June 9, 2015, 6:49 p.m., Jay Kreps wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 322 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line322 Do we need this? There is no real guarantee on the poll time, so it seems that we could just return when wakeup is called. You might be waking up from a synchronous commit, for example. In that case, all we can do is raise an exception. We could alternatively say that wakeup only applies to the poll() method and cannot be used to interrupt the other calls. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87257 --- On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/ --- (Updated June 5, 2015, 7:45 p.m.) Review request for kafka. Bugs: KAFKA-2168 https://issues.apache.org/jira/browse/KAFKA-2168 Repository: kafka Description --- KAFKA-2168; refactored callback handling to prevent unnecessary requests KAFKA-2168; address review comments KAFKA-2168; fix rebase error and checkstyle issue KAFKA-2168; address review comments and add docs KAFKA-2168; handle polling with timeout 0 KAFKA-2168; timeout=0 means return immediately Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 8f587bc0705b65b3ef37c86e0c25bb43ab8803de clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 1ca75f83d3667f7d01da1ae2fd9488fb79562364 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d1d1ec178f60dc47d408f52a89e52886c1a093a2 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java f50da825756938c193d7f07bee953e000e2627d9 clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java cee75410127dd1b86c1156563003216d93a086b3 clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 677edd385f35d4262342b567262c0b874876d25b clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ecc78cedf59a994fcf084fa7a458fe9ed5386b00 clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 Diff: https://reviews.apache.org/r/34789/diff/ Testing --- Thanks, Jason Gustafson
[Discussion] New Consumer API / Protocol
This email is to kick-off some discussion around the changes we want to make on the new consumer APIs as well as their semantics. Here are a not-comprehensive list of items in my mind: 1. Poll(timeout): current definition of timeout states The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. While in the current implementation, we have different semantics as stated, for example: a) poll(timeout) can return before timeout elapsed with empty consumed data. b) poll(timeout) can return after more than timeout elapsed due to blocking event like join-group, coordinator discovery, etc. We should think a bit more on what semantics we really want to provide and how to provide it in implementation. 2. Thread safeness: currently we have a coarsen-grained locking mechanism that provides thread safeness but blocks commit / position / etc calls while poll() is in process. We are considering to remove the coarsen-grained locking with an additional Consumer.wakeup() call to break the polling, and instead suggest users to have one consumer client per thread, which aligns with the design of a single-threaded consumer (KAFKA-2123). 3. Commit(): we want to improve the async commit calls to add a callback handler upon commit completes, and guarantee ordering of commit calls with retry policies (KAFKA-2168). In addition, we want to extend the API to expose attaching / fetching offset metadata stored in the Kafka offset manager. 4. OffsetFetchRequest: currently for handling OffsetCommitRequest we check the generation id and the assigned partitions before accepting the request if the group is using Kafka for partition management, but for OffsetFetchRequest we cannot do this checking since it does not include groupId / consumerId information. Do people think this is OK or we should add this as we did in OffsetCommitRequest? 5. New APIs: there are some other requests to add: a) offsetsBeforeTime(timestamp): or would seekToEnd and seekToBeginning sufficient? b) listTopics(): or should we just enforce users to use AdminUtils for such operations? There may be other issues that I have missed here, so folks just bring it up if you thought about anything else. -- Guozhang
Re: Review Request 34789: Patch for KAFKA-2168
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87190 --- Thanks for the patch. A few comments below. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139565 Could we add an example of how to use the new wakeup() call, especially with closing the consumer properly? For example, does the consumer thread just catch the ConsumerWakeupException and then call close()? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139574 Hmm, seekToBegining() is supposed to be a blocking call. Basically, at the end of the call, we expect the fetch offset to be set to the beginning. This is now changed to async, which doesn't match the intended behavior. We need to think through if this matters or not. Ditto for seekToEnd(). clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139577 Should we pass in tp to isOffsetResetNeeded()? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139509 The returned response may be ready already after the offsetBefore call due to needing metadata refresh. Since we don't check the ready state immediately afterward, we may be delaying the processing of metadata refresh by the request timeout. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139564 Currently, our coding convention is not to wrap single line statement with {}. There are a few other cases like this. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139544 In the async mode, response may not be ready in the first iteration. Are we handling the retry properly in that case? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139545 We probably need to make some changes here when KAFKA-2120 is done to handle the request timeout propertly. Perhaps we can add a TODO comment here. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/34789/#comment139563 -1 makes the pollClient block forever. So, we don't get a chance to do the wakeup check. clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java https://reviews.apache.org/r/34789/#comment139588 Do we need NONE? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java https://reviews.apache.org/r/34789/#comment139579 We probably can make it clear that this is a non-blocking call and doesn't wait for the request to be sent or the response to be received. It would be good to do that on other similar methods too. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java https://reviews.apache.org/r/34789/#comment139537 Should asynchronous by synchronous? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java https://reviews.apache.org/r/34789/#comment139586 Similar as the above, it would be useful to make it clear that fetchOffsets() is non-blocking. clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java https://reviews.apache.org/r/34789/#comment139534 Perhaps we can define a static method to initalize the constant and set the state. It's clearer that way since the instantiation and the initialization are in the same place. With this, we probably don't need the static getter methods and can just let the caller use the static constants directly. Ditto for BrokerResult. clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java https://reviews.apache.org/r/34789/#comment139540 The comment seems inaccurate. We are not returning an error, but returning a remedy instead. clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java https://reviews.apache.org/r/34789/#comment139504 Do we need both hasRemedy and hasException? It seems that if one returns true, the other should always return false. clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java https://reviews.apache.org/r/34789/#comment139578 To be consistent with the naming convention with the rest of the methods, should we just name it offsetRestNeeded()? - Jun Rao On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit:
Re: [Discussion] New Consumer API / Protocol
Thanks for kicking off this discussion, Guozhang. We might also want to discuss the API to expose the high watermark. Some discussion has been there in KAFKA-2076. Thanks, Jiangjie (Becket) Qin On 6/9/15, 1:12 PM, Guozhang Wang wangg...@gmail.com wrote: This email is to kick-off some discussion around the changes we want to make on the new consumer APIs as well as their semantics. Here are a not-comprehensive list of items in my mind: 1. Poll(timeout): current definition of timeout states The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. While in the current implementation, we have different semantics as stated, for example: a) poll(timeout) can return before timeout elapsed with empty consumed data. b) poll(timeout) can return after more than timeout elapsed due to blocking event like join-group, coordinator discovery, etc. We should think a bit more on what semantics we really want to provide and how to provide it in implementation. 2. Thread safeness: currently we have a coarsen-grained locking mechanism that provides thread safeness but blocks commit / position / etc calls while poll() is in process. We are considering to remove the coarsen-grained locking with an additional Consumer.wakeup() call to break the polling, and instead suggest users to have one consumer client per thread, which aligns with the design of a single-threaded consumer (KAFKA-2123). 3. Commit(): we want to improve the async commit calls to add a callback handler upon commit completes, and guarantee ordering of commit calls with retry policies (KAFKA-2168). In addition, we want to extend the API to expose attaching / fetching offset metadata stored in the Kafka offset manager. 4. OffsetFetchRequest: currently for handling OffsetCommitRequest we check the generation id and the assigned partitions before accepting the request if the group is using Kafka for partition management, but for OffsetFetchRequest we cannot do this checking since it does not include groupId / consumerId information. Do people think this is OK or we should add this as we did in OffsetCommitRequest? 5. New APIs: there are some other requests to add: a) offsetsBeforeTime(timestamp): or would seekToEnd and seekToBeginning sufficient? b) listTopics(): or should we just enforce users to use AdminUtils for such operations? There may be other issues that I have missed here, so folks just bring it up if you thought about anything else. -- Guozhang
Re: [Discussion] New Consumer API / Protocol
Just a minor correction, but #2 is KAFKA-2168 and #3 is KAFKA-2123. For #1, I think there should be some minimal effort to making the poll respect the timeout (with the understanding that spurious wakeups can happen). I think this really just means calling NetworkClient.poll() in a loop and checking if there are any results to return. An additional complication is that long poll timeouts can prevent heartbeats or auto-committed offsets from being sent. This means that the maximum time that the consumer should actually block in NetworkClient.poll() should be the minimum of the poll timeout, the heartbeat interval, and the auto-commit interval (if it's set). I attempted to handle this in my patch for KAFKA-2168, which is pending review. I tend to agree with Jay on 1b though. Trying to leave the JoinGroup pending could get tricky. On Tue, Jun 9, 2015 at 4:25 PM, Jay Kreps jay.kr...@gmail.com wrote: My two cents: Overall I think our focus as far as extensions go should be on (1) things which would be incompatible if changed later and (2) operationalization. There are lots of new apis that could be nice to have, but I think if adding them later will just be an api addition we should hold off and get out what we have, if it will be ugly to add later then let's try to get it in now. 1a. I don't think there is too much value in attempting to avoid spurious wakeups. Several people have asked about this but I think the only usage that makes sense is in an event loop--since you can always get 0 events due to a timeout. I think we should just document that the timeout is just there for guidance and not strictly enforced. I think this is intuitive since no timeout like this is ever really strictly enforced (if there is a 30 second gc pause we will be off by 30 seconds regardless of how diligent in our own code) 1b. Same here. I think we should just document this. Trying to return in the middle of a joinGroup will be very complex so I think we should just document this. Also: The docs currently say that a timeout of 0 blocks forever but I think that might be wrong. Regardless I think for consistency a timeout of 0 should be non-blocking and a timeout of MAX_INT should be used to block forever. 2. Here is my understanding of this one. I think on this there was originally some interest in tightening up the locking to allow more parallelism in consumer usage. Personally I think this adds a lot of complexity and would prefer to avoid it. My concern is primarily around implementation complexity--I think without a very careful, well abstracted threading model a big chunk of code with locks of adhoc locking, even if perfect when initially written is just very hard to maintain. I also think the single threaded usage pattern is also easier for the end-user and likely faster, though there are some downsides. After some discussion I think there was a second proposal to instead leave the locking as is and add a wakeup call. The use case for this would be something like quickly interrupting and shutting down a thread that is in it's event loop. I think this makes sense though we could probably release a v1 without it if need be. I think an open question here is whether interrupt needs to work for all calls (e.g. commit) or just poll, with other calls having a defacto timeout from the request timeout and retry count. I would vote for the later if it helps with implementation simplicity. 3. I think both these extensions make sense. It would be nice to get them into the first release to avoid api churn. 4. Not sure I fully understand. 5a. The rationale for doing seekToBeginning and seekToEnd was that we felt that we might need to think a little bit about the offset list api a bit more before knowing how best to expose it. We clearly need something here but the hope was the two seekTo calls were good enough to get started and we could wait to think it out properly to add the more general call. I think the thinking was basically (a) the time mechanism for the intermediate segments is often wrong, (b) the protocol that does the fetch is quite ad hoc and perhaps there is more per-segment info that should be in that request, and (c) the api in simple consumer is very unintuitive. If we are adding an end-state solution 5b. Not sure but can we add it later? How would mm work? On Tue, Jun 9, 2015 at 1:12 PM, Guozhang Wang wangg...@gmail.com wrote: This email is to kick-off some discussion around the changes we want to make on the new consumer APIs as well as their semantics. Here are a not-comprehensive list of items in my mind: 1. Poll(timeout): current definition of timeout states The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. While in the current implementation, we have different semantics as stated, for example: a) poll(timeout) can return before timeout elapsed with empty consumed data. b)
Re: [Discussion] New Consumer API / Protocol
My two cents: Overall I think our focus as far as extensions go should be on (1) things which would be incompatible if changed later and (2) operationalization. There are lots of new apis that could be nice to have, but I think if adding them later will just be an api addition we should hold off and get out what we have, if it will be ugly to add later then let's try to get it in now. 1a. I don't think there is too much value in attempting to avoid spurious wakeups. Several people have asked about this but I think the only usage that makes sense is in an event loop--since you can always get 0 events due to a timeout. I think we should just document that the timeout is just there for guidance and not strictly enforced. I think this is intuitive since no timeout like this is ever really strictly enforced (if there is a 30 second gc pause we will be off by 30 seconds regardless of how diligent in our own code) 1b. Same here. I think we should just document this. Trying to return in the middle of a joinGroup will be very complex so I think we should just document this. Also: The docs currently say that a timeout of 0 blocks forever but I think that might be wrong. Regardless I think for consistency a timeout of 0 should be non-blocking and a timeout of MAX_INT should be used to block forever. 2. Here is my understanding of this one. I think on this there was originally some interest in tightening up the locking to allow more parallelism in consumer usage. Personally I think this adds a lot of complexity and would prefer to avoid it. My concern is primarily around implementation complexity--I think without a very careful, well abstracted threading model a big chunk of code with locks of adhoc locking, even if perfect when initially written is just very hard to maintain. I also think the single threaded usage pattern is also easier for the end-user and likely faster, though there are some downsides. After some discussion I think there was a second proposal to instead leave the locking as is and add a wakeup call. The use case for this would be something like quickly interrupting and shutting down a thread that is in it's event loop. I think this makes sense though we could probably release a v1 without it if need be. I think an open question here is whether interrupt needs to work for all calls (e.g. commit) or just poll, with other calls having a defacto timeout from the request timeout and retry count. I would vote for the later if it helps with implementation simplicity. 3. I think both these extensions make sense. It would be nice to get them into the first release to avoid api churn. 4. Not sure I fully understand. 5a. The rationale for doing seekToBeginning and seekToEnd was that we felt that we might need to think a little bit about the offset list api a bit more before knowing how best to expose it. We clearly need something here but the hope was the two seekTo calls were good enough to get started and we could wait to think it out properly to add the more general call. I think the thinking was basically (a) the time mechanism for the intermediate segments is often wrong, (b) the protocol that does the fetch is quite ad hoc and perhaps there is more per-segment info that should be in that request, and (c) the api in simple consumer is very unintuitive. If we are adding an end-state solution 5b. Not sure but can we add it later? How would mm work? On Tue, Jun 9, 2015 at 1:12 PM, Guozhang Wang wangg...@gmail.com wrote: This email is to kick-off some discussion around the changes we want to make on the new consumer APIs as well as their semantics. Here are a not-comprehensive list of items in my mind: 1. Poll(timeout): current definition of timeout states The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. While in the current implementation, we have different semantics as stated, for example: a) poll(timeout) can return before timeout elapsed with empty consumed data. b) poll(timeout) can return after more than timeout elapsed due to blocking event like join-group, coordinator discovery, etc. We should think a bit more on what semantics we really want to provide and how to provide it in implementation. 2. Thread safeness: currently we have a coarsen-grained locking mechanism that provides thread safeness but blocks commit / position / etc calls while poll() is in process. We are considering to remove the coarsen-grained locking with an additional Consumer.wakeup() call to break the polling, and instead suggest users to have one consumer client per thread, which aligns with the design of a single-threaded consumer (KAFKA-2123). 3. Commit(): we want to improve the async commit calls to add a callback handler upon commit completes, and guarantee ordering of commit calls with retry policies (KAFKA-2168). In addition, we want to extend the API to expose attaching / fetching offset metadata stored in the Kafka offset
Re: Review Request 34789: Patch for KAFKA-2168
On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 995 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line995 Should we pass in tp to isOffsetResetNeeded()? Yes, we should. I'll fix it. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 797-798 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797 Hmm, seekToBegining() is supposed to be a blocking call. Basically, at the end of the call, we expect the fetch offset to be set to the beginning. This is now changed to async, which doesn't match the intended behavior. We need to think through if this matters or not. Ditto for seekToEnd(). Since we always update fetch positions before a new fetch and in position(), it didn't seem necessary to make it synchronous. I thought this handling might be more consistent with how new subscriptions are handled (which are asynchronous and defer the initial offset fetch until the next poll or position). That being said, I don't have a strong feeling about it, so we could return to the blocking version. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 1039-1040 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1039 The returned response may be ready already after the offsetBefore call due to needing metadata refresh. Since we don't check the ready state immediately afterward, we may be delaying the processing of metadata refresh by the request timeout. This is a pretty good point. One of the reasons working with NetworkClient is tricky is that you need several polls to complete a request: one to connect, one to send, and one to receive. In this case, the result might not be ready because we are in the middle of connecting to the broker, in which case we need to call poll() to finish the connect. If we don't, then then next request will just fail for the same reason. I'll look to see if there's a way to fix this to avoid unnecessary calls to poll. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 1139-1141 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1139 In the async mode, response may not be ready in the first iteration. Are we handling the retry properly in that case? When an async commit request fails, we do not retry, which is consistent with the current consumer. I think Ewen's patch for KAFKA-2123 introduces a good approach to retrying async commits. My own preference is to let them fail fast as long as the user has the callback from KAFKA-2123 to handle their failure. Otherwise, it gets a little tricky trying to preserve their order. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1195 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1195 We probably need to make some changes here when KAFKA-2120 is done to handle the request timeout propertly. Perhaps we can add a TODO comment here. I'll add a note. This also falls in the purview of KAFKA-1894. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java, line 16 https://reviews.apache.org/r/34789/diff/8/?file=980077#file980077line16 Do we need NONE? It was there before, but I don't think it's actually used. I'd be fine removing it. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java, lines 20-27 https://reviews.apache.org/r/34789/diff/8/?file=980080#file980080line20 Perhaps we can define a static method to initalize the constant and set the state. It's clearer that way since the instantiation and the initialization are in the same place. With this, we probably don't need the static getter methods and can just let the caller use the static constants directly. Ditto for BrokerResult. Agreed. I'll fix it. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 1212 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212 -1 makes the pollClient block forever. So, we don't get a chance to do the wakeup check. I might be wrong, but I think we can still use NetworkClient.wakeup to interrupt a poll call which is waiting forever. On June 9, 2015, 7:58 p.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, lines 1078-1080 https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1078
Query on kafka broker and topic metadata
Hi, I am new to kafka and I have a doubt. How to read specified broker and topic statistics from kafka server? I want to read below parameters about existing topic from kafka. 1) How many activeMessages 2) How many activeSubscriptions 3) How many totalMessages 4) How many totalSubscriptions 5) How mnay deliveryFaults 6) How many pendingDelivery Pls do the needful. Thanks Regards, Pavan