RE: [KIP-DISCUSSION] KIP-13 Quotas
So, are we all in agreement on this approach? If so, I can start working on a patch for it. Also, there are a couple of patches for quotas in need of review. I think we need a couple of reviewers, so I've added Jun as a reviewer in addition to Joel. https://reviews.apache.org/r/33049/ https://reviews.apache.org/r/33378/ Thanks, Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Friday, April 24, 2015 5:34 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas I think Jay meant a catch-all request/sec limit on all requests per-client. That makes sense. On Fri, Apr 24, 2015 at 11:02:29PM +, Aditya Auradkar wrote: I think Joel's suggestion is quite good. It's still possible to throttle other types of requests using purgatory but we will need a separate purgatory and DelayedOperation variants of different request types or perhaps add a ThrottledOperation type. It also addresses a couple of special case situations wrt delay time and replication timeouts. Jay, if we have a general mechanism of delaying requests then it should be possible to throttle any type of request as long as we have metrics on a per-client basis. For offset commit requests, we would simply need a request rate metric per-client and a good default quota. Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Friday, April 24, 2015 3:20 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Jun/Joel, Yeah we will definitely want to quota non-produce/consume requests. Especially offset commit and any other requests the consumer can trigger could easily get invoked in a tight loop by accident. We haven't talked about this a ton, but presumably the mechanism for all these would just be a general requests/sec limit that covers all requests? -Jay On Fri, Apr 24, 2015 at 2:18 PM, Jun Rao j...@confluent.io wrote: Joel, What you suggested makes sense. Not sure if there is a strong need to throttle TMR though since it should be infrequent. Thanks, Jun On Tue, Apr 21, 2015 at 12:21 PM, Joel Koshy jjkosh...@gmail.com wrote: Given the caveats, it may be worth doing further investigation on the alternate approach which is to use a dedicated DelayQueue for requests that violate quota and compare pros/cons. So the approach is the following: all request handling occurs normally (i.e., unchanged from what we do today). i.e., purgatories will be unchanged. After handling a request and before sending the response, check if the request has violated a quota. If so, then enqueue the response into a DelayQueue. All responses can share the same DelayQueue. Send those responses out after the delay has been met. There are some benefits to doing this: - We will eventually want to quota other requests as well. The above seems to be a clean staged approach that should work uniformly for all requests. i.e., parse request - handle request normally - check quota - hold in delay queue if quota violated - respond . All requests can share the same DelayQueue. (In contrast with the current proposal we could end up with a bunch of purgatories, or a combination of purgatories and delay queues.) - Since this approach does not need any fundamental modifications to the current request handling, it addresses the caveats that Adi noted (which is holding producer requests/fetch requests longer than strictly necessary if quota is violated since the proposal was to not watch on keys in that case). Likewise it addresses the caveat that Guozhang noted (we may return no error if the request is held long enough due to quota violation and satisfy a producer request that may have in fact exceeded the ack timeout) although it is probably reasonable to hide this case from the user. - By avoiding the caveats it also avoids the suggested work-around to the caveats which is effectively to add a min-hold-time to the purgatory. Although this is not a lot of code, I think it adds a quota-driven feature to the purgatory which is already non-trivial and should ideally remain unassociated with quota enforcement. For this to work well we need to be sure that we don't hold a lot of data in the DelayQueue - and therein lies a quirk to this approach. Producer responses (and most other responses) are very small so there is no issue. Fetch responses are fine as well - since we read off a FileMessageSet in response (zero-copy). This will remain true even when we support SSL since encryption occurs at the session layer (not the application layer). Topic metadata response can be a problem though. For this we ideally want to build the topic metadata response only when we are ready to respond. So for metadata-style responses which could contain
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hey Joel/Aditya, Yeah, that's what I meant. I agree it is easy to do. Just wanted to make sure folks had thought about that case. -Jay On Fri, Apr 24, 2015 at 5:34 PM, Joel Koshy jjkosh...@gmail.com wrote: I think Jay meant a catch-all request/sec limit on all requests per-client. That makes sense. On Fri, Apr 24, 2015 at 11:02:29PM +, Aditya Auradkar wrote: I think Joel's suggestion is quite good. It's still possible to throttle other types of requests using purgatory but we will need a separate purgatory and DelayedOperation variants of different request types or perhaps add a ThrottledOperation type. It also addresses a couple of special case situations wrt delay time and replication timeouts. Jay, if we have a general mechanism of delaying requests then it should be possible to throttle any type of request as long as we have metrics on a per-client basis. For offset commit requests, we would simply need a request rate metric per-client and a good default quota. Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Friday, April 24, 2015 3:20 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Jun/Joel, Yeah we will definitely want to quota non-produce/consume requests. Especially offset commit and any other requests the consumer can trigger could easily get invoked in a tight loop by accident. We haven't talked about this a ton, but presumably the mechanism for all these would just be a general requests/sec limit that covers all requests? -Jay On Fri, Apr 24, 2015 at 2:18 PM, Jun Rao j...@confluent.io wrote: Joel, What you suggested makes sense. Not sure if there is a strong need to throttle TMR though since it should be infrequent. Thanks, Jun On Tue, Apr 21, 2015 at 12:21 PM, Joel Koshy jjkosh...@gmail.com wrote: Given the caveats, it may be worth doing further investigation on the alternate approach which is to use a dedicated DelayQueue for requests that violate quota and compare pros/cons. So the approach is the following: all request handling occurs normally (i.e., unchanged from what we do today). i.e., purgatories will be unchanged. After handling a request and before sending the response, check if the request has violated a quota. If so, then enqueue the response into a DelayQueue. All responses can share the same DelayQueue. Send those responses out after the delay has been met. There are some benefits to doing this: - We will eventually want to quota other requests as well. The above seems to be a clean staged approach that should work uniformly for all requests. i.e., parse request - handle request normally - check quota - hold in delay queue if quota violated - respond . All requests can share the same DelayQueue. (In contrast with the current proposal we could end up with a bunch of purgatories, or a combination of purgatories and delay queues.) - Since this approach does not need any fundamental modifications to the current request handling, it addresses the caveats that Adi noted (which is holding producer requests/fetch requests longer than strictly necessary if quota is violated since the proposal was to not watch on keys in that case). Likewise it addresses the caveat that Guozhang noted (we may return no error if the request is held long enough due to quota violation and satisfy a producer request that may have in fact exceeded the ack timeout) although it is probably reasonable to hide this case from the user. - By avoiding the caveats it also avoids the suggested work-around to the caveats which is effectively to add a min-hold-time to the purgatory. Although this is not a lot of code, I think it adds a quota-driven feature to the purgatory which is already non-trivial and should ideally remain unassociated with quota enforcement. For this to work well we need to be sure that we don't hold a lot of data in the DelayQueue - and therein lies a quirk to this approach. Producer responses (and most other responses) are very small so there is no issue. Fetch responses are fine as well - since we read off a FileMessageSet in response (zero-copy). This will remain true even when we support SSL since encryption occurs at the session layer (not the application layer). Topic metadata response can be a problem though. For this we ideally want to build the topic metadata response only when we are ready to respond. So for metadata-style responses which could contain large response objects we may want to put the quota check and delay queue _before_ handling the request. So the design in this approach would need an amendment: provide a choice of where to put a request in the delay queue
Re: [KIP-DISCUSSION] KIP-13 Quotas
To turn it off/on we can just add a clear config (quota.enforcement.enabled) or similar. Thanks, Joel On Fri, Apr 24, 2015 at 06:27:22PM -0400, Gari Singh wrote: If we can't disable it, then can we use the tried and true method of using -1 to indicate that no throttling should take place? On Tue, Apr 21, 2015 at 4:38 PM, Joel Koshy jjkosh...@gmail.com wrote: In either approach I'm not sure we considered being able to turn it off completely. IOW, no it is not a plugin if that's what you are asking. We can set very high defaults by default and in the absence of any overrides it would effectively be off. The quota enforcement is actually already part of the metrics package. The new code (that exercises it) will be added to wherever the metrics are being measured. Thanks, Joel On Tue, Apr 21, 2015 at 03:04:07PM -0400, Tong Li wrote: Joel, Nice write up. Couple of questions, not sure if they have been answered. Since we will have a call later today, I would like to ask here as well so that we can talk about if not responded in email discussion. 1. Where the new code will be plugged in, that is, where is the plugin point, KafkaApi? 2. Can this quota control be disabled/enabled without affect anything else? From the design wiki page, it seems to me that each request will at least pay a penalty of checking quota enablement. Thanks. Tong Li OpenStack Kafka Community Development Building 501/B205 liton...@us.ibm.com From: Joel Koshy jjkosh...@gmail.com To: dev@kafka.apache.org Date: 04/21/2015 01:22 PM Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Given the caveats, it may be worth doing further investigation on the alternate approach which is to use a dedicated DelayQueue for requests that violate quota and compare pros/cons. So the approach is the following: all request handling occurs normally (i.e., unchanged from what we do today). i.e., purgatories will be unchanged. After handling a request and before sending the response, check if the request has violated a quota. If so, then enqueue the response into a DelayQueue. All responses can share the same DelayQueue. Send those responses out after the delay has been met. There are some benefits to doing this: - We will eventually want to quota other requests as well. The above seems to be a clean staged approach that should work uniformly for all requests. i.e., parse request - handle request normally - check quota - hold in delay queue if quota violated - respond . All requests can share the same DelayQueue. (In contrast with the current proposal we could end up with a bunch of purgatories, or a combination of purgatories and delay queues.) - Since this approach does not need any fundamental modifications to the current request handling, it addresses the caveats that Adi noted (which is holding producer requests/fetch requests longer than strictly necessary if quota is violated since the proposal was to not watch on keys in that case). Likewise it addresses the caveat that Guozhang noted (we may return no error if the request is held long enough due to quota violation and satisfy a producer request that may have in fact exceeded the ack timeout) although it is probably reasonable to hide this case from the user. - By avoiding the caveats it also avoids the suggested work-around to the caveats which is effectively to add a min-hold-time to the purgatory. Although this is not a lot of code, I think it adds a quota-driven feature to the purgatory which is already non-trivial and should ideally remain unassociated with quota enforcement. For this to work well we need to be sure that we don't hold a lot of data in the DelayQueue - and therein lies a quirk to this approach. Producer responses (and most other responses) are very small so there is no issue. Fetch responses are fine as well - since we read off a FileMessageSet in response (zero-copy). This will remain true even when we support SSL since encryption occurs at the session layer (not the application layer). Topic metadata response can be a problem though. For this we ideally want to build the topic metadata response only when we are ready to respond. So for metadata-style responses which could contain large response objects we may want to put the quota check and delay queue _before_ handling the request. So the design in this approach would need an amendment: provide a choice of where to put a request in the delay queue: either before handling or after handling (before response). So for: small request, large response: delay queue before handling large request, small response: delay queue after handling, before response small request, small response: either is fine
Re: [KIP-DISCUSSION] KIP-13 Quotas
that the metric may not be 100% accurate. We may be able to limit the fetch size of each partition to what's in the original estimate. For the produce request, I was thinking that another way to do this is to first figure out the quota_timeout. Then wait in Purgatory for quota_timeout with no key. If the request is not satisfied in quota_timeout and (request_timeout quota_timeout), wait in Purgatory for (request_timeout - quota_timeout) with the original keys. Thanks, Jun On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog and return a response. The caveat here is that if the request is throttled but the throttle time is less than the max_wait timeout on the fetch request, the request will be delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than strictly necessary (since we are not watching for satisfaction on any keys). I added some testcases to DelayedOperationTest to verify that it is possible to schedule operations with no watcher keys. By inserting elements with no watch keys, the purgatory simply becomes a delay queue. It may also make sense to add a new API to the purgatory called delayFor() that basically accepts an operation without any watch keys (Thanks for the suggestion Joel). Thoughts? Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Monday, April 13, 2015 7:27 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas I think KAFKA-2063 (bounding fetch response) is still under discussion, and may not be got it in time with KAFKA-1927. On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: I think it's reasonable to batch the protocol changes together. In addition to the protocol changes, is someone actively driving the server side changes/KIP process for KAFKA-2063? Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Thursday, April 09, 2015 8:59 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Since we are also thinking about evolving the fetch request protocol in KAFKA-2063 (bound fetch response size), perhaps it's worth thinking through if we can just evolve the protocol once. Thanks, Jun On Wed, Apr 8, 2015 at 10:43 AM, Aditya Auradkar aaurad
Re: [KIP-DISCUSSION] KIP-13 Quotas
If we can't disable it, then can we use the tried and true method of using -1 to indicate that no throttling should take place? On Tue, Apr 21, 2015 at 4:38 PM, Joel Koshy jjkosh...@gmail.com wrote: In either approach I'm not sure we considered being able to turn it off completely. IOW, no it is not a plugin if that's what you are asking. We can set very high defaults by default and in the absence of any overrides it would effectively be off. The quota enforcement is actually already part of the metrics package. The new code (that exercises it) will be added to wherever the metrics are being measured. Thanks, Joel On Tue, Apr 21, 2015 at 03:04:07PM -0400, Tong Li wrote: Joel, Nice write up. Couple of questions, not sure if they have been answered. Since we will have a call later today, I would like to ask here as well so that we can talk about if not responded in email discussion. 1. Where the new code will be plugged in, that is, where is the plugin point, KafkaApi? 2. Can this quota control be disabled/enabled without affect anything else? From the design wiki page, it seems to me that each request will at least pay a penalty of checking quota enablement. Thanks. Tong Li OpenStack Kafka Community Development Building 501/B205 liton...@us.ibm.com From: Joel Koshy jjkosh...@gmail.com To: dev@kafka.apache.org Date: 04/21/2015 01:22 PM Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Given the caveats, it may be worth doing further investigation on the alternate approach which is to use a dedicated DelayQueue for requests that violate quota and compare pros/cons. So the approach is the following: all request handling occurs normally (i.e., unchanged from what we do today). i.e., purgatories will be unchanged. After handling a request and before sending the response, check if the request has violated a quota. If so, then enqueue the response into a DelayQueue. All responses can share the same DelayQueue. Send those responses out after the delay has been met. There are some benefits to doing this: - We will eventually want to quota other requests as well. The above seems to be a clean staged approach that should work uniformly for all requests. i.e., parse request - handle request normally - check quota - hold in delay queue if quota violated - respond . All requests can share the same DelayQueue. (In contrast with the current proposal we could end up with a bunch of purgatories, or a combination of purgatories and delay queues.) - Since this approach does not need any fundamental modifications to the current request handling, it addresses the caveats that Adi noted (which is holding producer requests/fetch requests longer than strictly necessary if quota is violated since the proposal was to not watch on keys in that case). Likewise it addresses the caveat that Guozhang noted (we may return no error if the request is held long enough due to quota violation and satisfy a producer request that may have in fact exceeded the ack timeout) although it is probably reasonable to hide this case from the user. - By avoiding the caveats it also avoids the suggested work-around to the caveats which is effectively to add a min-hold-time to the purgatory. Although this is not a lot of code, I think it adds a quota-driven feature to the purgatory which is already non-trivial and should ideally remain unassociated with quota enforcement. For this to work well we need to be sure that we don't hold a lot of data in the DelayQueue - and therein lies a quirk to this approach. Producer responses (and most other responses) are very small so there is no issue. Fetch responses are fine as well - since we read off a FileMessageSet in response (zero-copy). This will remain true even when we support SSL since encryption occurs at the session layer (not the application layer). Topic metadata response can be a problem though. For this we ideally want to build the topic metadata response only when we are ready to respond. So for metadata-style responses which could contain large response objects we may want to put the quota check and delay queue _before_ handling the request. So the design in this approach would need an amendment: provide a choice of where to put a request in the delay queue: either before handling or after handling (before response). So for: small request, large response: delay queue before handling large request, small response: delay queue after handling, before response small request, small response: either is fine large request, large resopnse: we really cannot do anything here but we don't really have this scenario yet So the design would look like this: - parse request - before handling request check if quota violated; if so compute two delay numbers
Re: [KIP-DISCUSSION] KIP-13 Quotas
I think Jay meant a catch-all request/sec limit on all requests per-client. That makes sense. On Fri, Apr 24, 2015 at 11:02:29PM +, Aditya Auradkar wrote: I think Joel's suggestion is quite good. It's still possible to throttle other types of requests using purgatory but we will need a separate purgatory and DelayedOperation variants of different request types or perhaps add a ThrottledOperation type. It also addresses a couple of special case situations wrt delay time and replication timeouts. Jay, if we have a general mechanism of delaying requests then it should be possible to throttle any type of request as long as we have metrics on a per-client basis. For offset commit requests, we would simply need a request rate metric per-client and a good default quota. Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Friday, April 24, 2015 3:20 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Jun/Joel, Yeah we will definitely want to quota non-produce/consume requests. Especially offset commit and any other requests the consumer can trigger could easily get invoked in a tight loop by accident. We haven't talked about this a ton, but presumably the mechanism for all these would just be a general requests/sec limit that covers all requests? -Jay On Fri, Apr 24, 2015 at 2:18 PM, Jun Rao j...@confluent.io wrote: Joel, What you suggested makes sense. Not sure if there is a strong need to throttle TMR though since it should be infrequent. Thanks, Jun On Tue, Apr 21, 2015 at 12:21 PM, Joel Koshy jjkosh...@gmail.com wrote: Given the caveats, it may be worth doing further investigation on the alternate approach which is to use a dedicated DelayQueue for requests that violate quota and compare pros/cons. So the approach is the following: all request handling occurs normally (i.e., unchanged from what we do today). i.e., purgatories will be unchanged. After handling a request and before sending the response, check if the request has violated a quota. If so, then enqueue the response into a DelayQueue. All responses can share the same DelayQueue. Send those responses out after the delay has been met. There are some benefits to doing this: - We will eventually want to quota other requests as well. The above seems to be a clean staged approach that should work uniformly for all requests. i.e., parse request - handle request normally - check quota - hold in delay queue if quota violated - respond . All requests can share the same DelayQueue. (In contrast with the current proposal we could end up with a bunch of purgatories, or a combination of purgatories and delay queues.) - Since this approach does not need any fundamental modifications to the current request handling, it addresses the caveats that Adi noted (which is holding producer requests/fetch requests longer than strictly necessary if quota is violated since the proposal was to not watch on keys in that case). Likewise it addresses the caveat that Guozhang noted (we may return no error if the request is held long enough due to quota violation and satisfy a producer request that may have in fact exceeded the ack timeout) although it is probably reasonable to hide this case from the user. - By avoiding the caveats it also avoids the suggested work-around to the caveats which is effectively to add a min-hold-time to the purgatory. Although this is not a lot of code, I think it adds a quota-driven feature to the purgatory which is already non-trivial and should ideally remain unassociated with quota enforcement. For this to work well we need to be sure that we don't hold a lot of data in the DelayQueue - and therein lies a quirk to this approach. Producer responses (and most other responses) are very small so there is no issue. Fetch responses are fine as well - since we read off a FileMessageSet in response (zero-copy). This will remain true even when we support SSL since encryption occurs at the session layer (not the application layer). Topic metadata response can be a problem though. For this we ideally want to build the topic metadata response only when we are ready to respond. So for metadata-style responses which could contain large response objects we may want to put the quota check and delay queue _before_ handling the request. So the design in this approach would need an amendment: provide a choice of where to put a request in the delay queue: either before handling or after handling (before response). So for: small request, large response: delay queue before handling large request, small response: delay queue after handling, before response small request, small response: either is fine large request
Re: [KIP-DISCUSSION] KIP-13 Quotas
PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog and return a response. The caveat here is that if the request is throttled but the throttle time is less than the max_wait timeout on the fetch request, the request will be delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than strictly necessary (since we are not watching for satisfaction on any keys). I added some testcases to DelayedOperationTest to verify that it is possible to schedule operations with no watcher keys. By inserting elements with no watch keys, the purgatory simply becomes a delay queue. It may also make sense to add a new API to the purgatory called delayFor() that basically accepts an operation without any watch keys (Thanks for the suggestion Joel). Thoughts? Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Monday, April 13, 2015 7:27 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas I think KAFKA-2063 (bounding fetch response) is still under discussion, and may not be got it in time with KAFKA-1927. On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: I think it's reasonable to batch the protocol changes together. In addition to the protocol changes, is someone actively driving the server side changes/KIP process for KAFKA-2063? Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Thursday, April 09, 2015 8:59 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Since we are also thinking about evolving the fetch request protocol in KAFKA-2063 (bound fetch response size), perhaps it's worth thinking through if we can just evolve the protocol once. Thanks, Jun On Wed, Apr 8, 2015 at 10:43 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Thanks for the detailed review. I've addressed your comments. For rejected alternatives, we've rejected per-partition distribution because we choose client based quotas where there is no notion of partitions. I've explained in a bit more detail in that section. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Wednesday, April 08, 2015 6:30 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for updating the wiki. Looks great overall. Just a couple more comments: Client status code
Re: [KIP-DISCUSSION] KIP-13 Quotas
to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog and return a response. The caveat here is that if the request is throttled but the throttle time is less than the max_wait timeout on the fetch request, the request will be delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than strictly necessary (since we are not watching for satisfaction on any keys). I added some testcases to DelayedOperationTest to verify that it is possible to schedule operations with no watcher keys. By inserting elements with no watch keys, the purgatory simply becomes a delay queue. It may also make sense to add a new API to the purgatory called delayFor() that basically accepts an operation without any watch keys (Thanks for the suggestion Joel). Thoughts? Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Monday, April 13, 2015 7:27 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas I think KAFKA-2063 (bounding fetch response) is still under discussion, and may not be got it in time with KAFKA-1927. On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: I think it's reasonable to batch the protocol changes together. In addition to the protocol changes, is someone actively driving the server side changes/KIP process for KAFKA-2063? Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Thursday, April 09, 2015 8:59 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Since we are also thinking about evolving the fetch request protocol in KAFKA-2063 (bound fetch response size), perhaps it's worth thinking through if we can just evolve the protocol once. Thanks, Jun On Wed, Apr 8, 2015 at 10:43 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Thanks for the detailed review. I've addressed your comments. For rejected alternatives, we've rejected per-partition distribution because we choose client based quotas where there is no notion of partitions. I've explained in a bit more detail in that section. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Wednesday, April 08, 2015 6:30 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for updating the wiki. Looks great overall. Just a couple more comments: Client status code: - v0 requests - current version (0) of those requests. - Fetch response has a throttled flag instead of throttle time - I think you intended the latter. - Can you make it clear that the quota status is a new field called throttleTimeMs (or equivalent). It would help if some of that is moved (or repeated) in compatibility/migration plan. - So you would need to upgrade brokers first, then the clients
Re: [KIP-DISCUSSION] KIP-13 Quotas
In either approach I'm not sure we considered being able to turn it off completely. IOW, no it is not a plugin if that's what you are asking. We can set very high defaults by default and in the absence of any overrides it would effectively be off. The quota enforcement is actually already part of the metrics package. The new code (that exercises it) will be added to wherever the metrics are being measured. Thanks, Joel On Tue, Apr 21, 2015 at 03:04:07PM -0400, Tong Li wrote: Joel, Nice write up. Couple of questions, not sure if they have been answered. Since we will have a call later today, I would like to ask here as well so that we can talk about if not responded in email discussion. 1. Where the new code will be plugged in, that is, where is the plugin point, KafkaApi? 2. Can this quota control be disabled/enabled without affect anything else? From the design wiki page, it seems to me that each request will at least pay a penalty of checking quota enablement. Thanks. Tong Li OpenStack Kafka Community Development Building 501/B205 liton...@us.ibm.com From: Joel Koshy jjkosh...@gmail.com To: dev@kafka.apache.org Date: 04/21/2015 01:22 PM Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Given the caveats, it may be worth doing further investigation on the alternate approach which is to use a dedicated DelayQueue for requests that violate quota and compare pros/cons. So the approach is the following: all request handling occurs normally (i.e., unchanged from what we do today). i.e., purgatories will be unchanged. After handling a request and before sending the response, check if the request has violated a quota. If so, then enqueue the response into a DelayQueue. All responses can share the same DelayQueue. Send those responses out after the delay has been met. There are some benefits to doing this: - We will eventually want to quota other requests as well. The above seems to be a clean staged approach that should work uniformly for all requests. i.e., parse request - handle request normally - check quota - hold in delay queue if quota violated - respond . All requests can share the same DelayQueue. (In contrast with the current proposal we could end up with a bunch of purgatories, or a combination of purgatories and delay queues.) - Since this approach does not need any fundamental modifications to the current request handling, it addresses the caveats that Adi noted (which is holding producer requests/fetch requests longer than strictly necessary if quota is violated since the proposal was to not watch on keys in that case). Likewise it addresses the caveat that Guozhang noted (we may return no error if the request is held long enough due to quota violation and satisfy a producer request that may have in fact exceeded the ack timeout) although it is probably reasonable to hide this case from the user. - By avoiding the caveats it also avoids the suggested work-around to the caveats which is effectively to add a min-hold-time to the purgatory. Although this is not a lot of code, I think it adds a quota-driven feature to the purgatory which is already non-trivial and should ideally remain unassociated with quota enforcement. For this to work well we need to be sure that we don't hold a lot of data in the DelayQueue - and therein lies a quirk to this approach. Producer responses (and most other responses) are very small so there is no issue. Fetch responses are fine as well - since we read off a FileMessageSet in response (zero-copy). This will remain true even when we support SSL since encryption occurs at the session layer (not the application layer). Topic metadata response can be a problem though. For this we ideally want to build the topic metadata response only when we are ready to respond. So for metadata-style responses which could contain large response objects we may want to put the quota check and delay queue _before_ handling the request. So the design in this approach would need an amendment: provide a choice of where to put a request in the delay queue: either before handling or after handling (before response). So for: small request, large response: delay queue before handling large request, small response: delay queue after handling, before response small request, small response: either is fine large request, large resopnse: we really cannot do anything here but we don't really have this scenario yet So the design would look like this: - parse request - before handling request check if quota violated; if so compute two delay numbers: - before handling delay - before response delay - if before-handling delay 0 insert into before-handling delay queue - handle the request - if before-response delay 0 insert into before-response delay queue - respond Just throwing this out there for discussion. Thanks, Joel
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hi Adi, 2. I assume you were saying than strictly needed for replications here? Also the concern I have is around error code: today if the replication is not finished within in the replication timeout then the error code will be set accordingly when it returns. Let's say if the produce request is not satisfied after X (replication timeout) ms, but is satisfied after Y (throttling timeout), should we still set the error code or not? I think it is OK to just set NO_ERROR but we need to document such cases clearly for quote actions mixed with ack = -1. Guozhang On Wed, Apr 15, 2015 at 4:23 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Thanks for the review Guozhang. 1. Agreed. 2. This proposal actually waits for the maximum of the 2 timeouts. This reduces implementation complexity at the cost of waiting longer than strictly needed for quotas. Note that this is only for the case where acks=-1. However we can solve this if it is a significant concern by adding watcher keys for all partitions (only if acks=-1). These are the keys we would normally add while waiting for acknowledgements. We can change the tryComplete() function to return false until 'quota_timeout' time has elapsed AND all the acknowledgements have been received. Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Wednesday, April 15, 2015 3:42 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for the summary. A few comments below: 1. Say a produce request has replication timeout X, and upon finishing the local append it is determined to be throttled Y ms where Y X, then after it has timed out in the purgatory after Y ms we should still check if the #.acks has fulfilled in order to set the correct error codes in the response. 2. I think it is actually common that the calculated throttle time Y is less than the replication timeout X, which will be a tricky case since we need to make sure 1) at least the request it held in the purgatory for Y ms, 2) after Y ms elapsed, if the #.acks has fulfilled within X ms then set no-error-code and return immediately, 3) after X ms elapsed, set timeout-error-code and return. Guozhang On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog and return a response. The caveat here is that if the request is throttled but the throttle time is less than the max_wait timeout on the fetch request, the request will be delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than strictly necessary (since we are not watching for satisfaction on any keys). I added some testcases to DelayedOperationTest to verify
RE: [KIP-DISCUSSION] KIP-13 Quotas
Hey Guozhang, I don't think we should return an error if the request is satisfied after Y (throttling timeout) because it may cause the producer to think that the request was not ack'ed at all. Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Thursday, April 16, 2015 9:06 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hi Adi, 2. I assume you were saying than strictly needed for replications here? Also the concern I have is around error code: today if the replication is not finished within in the replication timeout then the error code will be set accordingly when it returns. Let's say if the produce request is not satisfied after X (replication timeout) ms, but is satisfied after Y (throttling timeout), should we still set the error code or not? I think it is OK to just set NO_ERROR but we need to document such cases clearly for quote actions mixed with ack = -1. Guozhang On Wed, Apr 15, 2015 at 4:23 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Thanks for the review Guozhang. 1. Agreed. 2. This proposal actually waits for the maximum of the 2 timeouts. This reduces implementation complexity at the cost of waiting longer than strictly needed for quotas. Note that this is only for the case where acks=-1. However we can solve this if it is a significant concern by adding watcher keys for all partitions (only if acks=-1). These are the keys we would normally add while waiting for acknowledgements. We can change the tryComplete() function to return false until 'quota_timeout' time has elapsed AND all the acknowledgements have been received. Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Wednesday, April 15, 2015 3:42 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for the summary. A few comments below: 1. Say a produce request has replication timeout X, and upon finishing the local append it is determined to be throttled Y ms where Y X, then after it has timed out in the purgatory after Y ms we should still check if the #.acks has fulfilled in order to set the correct error codes in the response. 2. I think it is actually common that the calculated throttle time Y is less than the replication timeout X, which will be a tricky case since we need to make sure 1) at least the request it held in the purgatory for Y ms, 2) after Y ms elapsed, if the #.acks has fulfilled within X ms then set no-error-code and return immediately, 3) after X ms elapsed, set timeout-error-code and return. Guozhang On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog
Re: [KIP-DISCUSSION] KIP-13 Quotas
The quota check for the fetch request is a bit different from the produce request. I assume that for the fetch request, we will first get an estimated fetch response size to do the quota check. There are two things to think about. First, when we actually send the response, we probably don't want to record the metric again since it will double count. Second, the bytes that the fetch response actually sends could be more than the estimate. This means that the metric may not be 100% accurate. We may be able to limit the fetch size of each partition to what's in the original estimate. For the produce request, I was thinking that another way to do this is to first figure out the quota_timeout. Then wait in Purgatory for quota_timeout with no key. If the request is not satisfied in quota_timeout and (request_timeout quota_timeout), wait in Purgatory for (request_timeout - quota_timeout) with the original keys. Thanks, Jun On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog and return a response. The caveat here is that if the request is throttled but the throttle time is less than the max_wait timeout on the fetch request, the request will be delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than strictly necessary (since we are not watching for satisfaction on any keys). I added some testcases to DelayedOperationTest to verify that it is possible to schedule operations with no watcher keys. By inserting elements with no watch keys, the purgatory simply becomes a delay queue. It may also make sense to add a new API to the purgatory called delayFor() that basically accepts an operation without any watch keys (Thanks for the suggestion Joel). Thoughts? Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Monday, April 13, 2015 7:27 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas I think KAFKA-2063 (bounding fetch response) is still under discussion, and may not be got it in time with KAFKA-1927. On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: I think it's reasonable to batch the protocol changes together. In addition to the protocol changes, is someone actively driving the server side changes/KIP process for KAFKA-2063? Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Thursday, April 09, 2015 8:59 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Since we are also thinking about evolving the fetch request protocol in KAFKA-2063 (bound fetch response size), perhaps it's worth thinking through if we can just evolve the protocol once. Thanks
Re: [KIP-DISCUSSION] KIP-13 Quotas
Thanks for the summary. A few comments below: 1. Say a produce request has replication timeout X, and upon finishing the local append it is determined to be throttled Y ms where Y X, then after it has timed out in the purgatory after Y ms we should still check if the #.acks has fulfilled in order to set the correct error codes in the response. 2. I think it is actually common that the calculated throttle time Y is less than the replication timeout X, which will be a tricky case since we need to make sure 1) at least the request it held in the purgatory for Y ms, 2) after Y ms elapsed, if the #.acks has fulfilled within X ms then set no-error-code and return immediately, 3) after X ms elapsed, set timeout-error-code and return. Guozhang On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog and return a response. The caveat here is that if the request is throttled but the throttle time is less than the max_wait timeout on the fetch request, the request will be delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than strictly necessary (since we are not watching for satisfaction on any keys). I added some testcases to DelayedOperationTest to verify that it is possible to schedule operations with no watcher keys. By inserting elements with no watch keys, the purgatory simply becomes a delay queue. It may also make sense to add a new API to the purgatory called delayFor() that basically accepts an operation without any watch keys (Thanks for the suggestion Joel). Thoughts? Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Monday, April 13, 2015 7:27 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas I think KAFKA-2063 (bounding fetch response) is still under discussion, and may not be got it in time with KAFKA-1927. On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: I think it's reasonable to batch the protocol changes together. In addition to the protocol changes, is someone actively driving the server side changes/KIP process for KAFKA-2063? Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Thursday, April 09, 2015 8:59 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Since we are also thinking about evolving the fetch request protocol in KAFKA-2063 (bound fetch response size), perhaps it's worth thinking through if we can just evolve the protocol once. Thanks, Jun On Wed, Apr 8, 2015 at 10:43 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Thanks for the detailed review. I've addressed your comments. For rejected
RE: [KIP-DISCUSSION] KIP-13 Quotas
Thanks for the review Guozhang. 1. Agreed. 2. This proposal actually waits for the maximum of the 2 timeouts. This reduces implementation complexity at the cost of waiting longer than strictly needed for quotas. Note that this is only for the case where acks=-1. However we can solve this if it is a significant concern by adding watcher keys for all partitions (only if acks=-1). These are the keys we would normally add while waiting for acknowledgements. We can change the tryComplete() function to return false until 'quota_timeout' time has elapsed AND all the acknowledgements have been received. Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Wednesday, April 15, 2015 3:42 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for the summary. A few comments below: 1. Say a produce request has replication timeout X, and upon finishing the local append it is determined to be throttled Y ms where Y X, then after it has timed out in the purgatory after Y ms we should still check if the #.acks has fulfilled in order to set the correct error codes in the response. 2. I think it is actually common that the calculated throttle time Y is less than the replication timeout X, which will be a tricky case since we need to make sure 1) at least the request it held in the purgatory for Y ms, 2) after Y ms elapsed, if the #.acks has fulfilled within X ms then set no-error-code and return immediately, 3) after X ms elapsed, set timeout-error-code and return. Guozhang On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog and return a response. The caveat here is that if the request is throttled but the throttle time is less than the max_wait timeout on the fetch request, the request will be delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than strictly necessary (since we are not watching for satisfaction on any keys). I added some testcases to DelayedOperationTest to verify that it is possible to schedule operations with no watcher keys. By inserting elements with no watch keys, the purgatory simply becomes a delay queue. It may also make sense to add a new API to the purgatory called delayFor() that basically accepts an operation without any watch keys (Thanks for the suggestion Joel). Thoughts? Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Monday, April 13, 2015 7:27 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas I think KAFKA-2063 (bounding fetch response) is still under discussion, and may not be got it in time with KAFKA-1927. On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar aaurad
RE: [KIP-DISCUSSION] KIP-13 Quotas
This is an implementation proposal for delaying requests in quotas using the current purgatory. I'll discuss the usage for produce and fetch requests separately. 1. Delayed Produce Requests - Here, the proposal is basically to reuse DelayedProduce objects and insert them into the purgatory with no watcher keys if the request is being throttled. The timeout used in the request should be the Max(quota_delay_time, replication_timeout). In most cases, the quota timeout should be greater than the existing timeout but in order to be safe, we can use the maximum of these values. Having no watch keys will allow the operation to be enqueued directly into the timer and will not add any overhead in terms of watching keys (which was a concern). In this case, having watch keys is not beneficial since the operation must be delayed for a fixed amount of time and there is no possibility for the operation to complete before the timeout i.e. tryComplete() can never return true before the timeout. On timeout, since the operation is a TimerTask, the timer will call run() which calls onComplete(). In onComplete, the DelayedProduce can repeat the check in tryComplete() (only if acks=-1 whether all replicas fetched upto a certain offset) and return the response immediately. Code will be structured as follows in ReplicaManager:appendMessages() if(isThrottled) { fetch = new DelayedProduce(timeout) purgatory.tryCompleteElseWatch(fetch, Seq()) } else if(delayedRequestRequired()) { // Insert into purgatory with watched keys for unthrottled requests } In this proposal, we avoid adding unnecessary watches because there is no possibility of early completion and this avoids any potential performance penalties we were concerned about earlier. 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the DelayedFetch objects and insert them into the purgatory with no watcher keys if the request is throttled. Timeout used is the Max(quota_delay_time, max_wait_timeout). Having no watch keys provides the same benefits as described above. Upon timeout, the onComplete() is called and the operation proceeds normally i.e. perform a readFromLocalLog and return a response. The caveat here is that if the request is throttled but the throttle time is less than the max_wait timeout on the fetch request, the request will be delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than strictly necessary (since we are not watching for satisfaction on any keys). I added some testcases to DelayedOperationTest to verify that it is possible to schedule operations with no watcher keys. By inserting elements with no watch keys, the purgatory simply becomes a delay queue. It may also make sense to add a new API to the purgatory called delayFor() that basically accepts an operation without any watch keys (Thanks for the suggestion Joel). Thoughts? Thanks, Aditya From: Guozhang Wang [wangg...@gmail.com] Sent: Monday, April 13, 2015 7:27 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas I think KAFKA-2063 (bounding fetch response) is still under discussion, and may not be got it in time with KAFKA-1927. On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: I think it's reasonable to batch the protocol changes together. In addition to the protocol changes, is someone actively driving the server side changes/KIP process for KAFKA-2063? Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Thursday, April 09, 2015 8:59 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Since we are also thinking about evolving the fetch request protocol in KAFKA-2063 (bound fetch response size), perhaps it's worth thinking through if we can just evolve the protocol once. Thanks, Jun On Wed, Apr 8, 2015 at 10:43 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Thanks for the detailed review. I've addressed your comments. For rejected alternatives, we've rejected per-partition distribution because we choose client based quotas where there is no notion of partitions. I've explained in a bit more detail in that section. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Wednesday, April 08, 2015 6:30 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for updating the wiki. Looks great overall. Just a couple more comments: Client status code: - v0 requests - current version (0) of those requests. - Fetch response has a throttled flag instead of throttle time - I think you intended the latter. - Can you make it clear that the quota status is a new field called throttleTimeMs (or equivalent). It would help if some of that is moved (or repeated) in compatibility/migration plan. - So you would need
Re: [KIP-DISCUSSION] KIP-13 Quotas
I think KAFKA-2063 (bounding fetch response) is still under discussion, and may not be got it in time with KAFKA-1927. On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: I think it's reasonable to batch the protocol changes together. In addition to the protocol changes, is someone actively driving the server side changes/KIP process for KAFKA-2063? Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Thursday, April 09, 2015 8:59 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Since we are also thinking about evolving the fetch request protocol in KAFKA-2063 (bound fetch response size), perhaps it's worth thinking through if we can just evolve the protocol once. Thanks, Jun On Wed, Apr 8, 2015 at 10:43 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Thanks for the detailed review. I've addressed your comments. For rejected alternatives, we've rejected per-partition distribution because we choose client based quotas where there is no notion of partitions. I've explained in a bit more detail in that section. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Wednesday, April 08, 2015 6:30 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for updating the wiki. Looks great overall. Just a couple more comments: Client status code: - v0 requests - current version (0) of those requests. - Fetch response has a throttled flag instead of throttle time - I think you intended the latter. - Can you make it clear that the quota status is a new field called throttleTimeMs (or equivalent). It would help if some of that is moved (or repeated) in compatibility/migration plan. - So you would need to upgrade brokers first, then the clients. While upgrading the brokers (via a rolling bounce) the brokers cannot start using the latest fetch-request version immediately (for replica fetches). Since there will be older brokers in the mix those brokers would not be able to read v1 fetch requests. So all the brokers should be upgraded before switching to the latest fetch request version. This is similar to what Gwen proposed in KIP-2/KAFKA-1809 and I think we will need to use the inter-broker protocol version config. Rejected alternatives-quota-distribution.B: notes that this is the most elegant model, but does not explain why it was rejected. I think this was because we would then need some sort of gossip between brokers since partitions are across the cluster. Can you confirm? Thanks, Joel On Wed, Apr 08, 2015 at 05:45:34AM +, Aditya Auradkar wrote: Hey everyone, Following up after today's hangout. After discussing the client side metrics piece internally, we've incorporated that section into the KIP. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Since there appears to be sufficient consensus, I'm going to start a voting thread. Thanks, Aditya From: Gwen Shapira [gshap...@cloudera.com] Sent: Tuesday, April 07, 2015 11:31 AM To: Sriharsha Chintalapani Cc: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Yeah, I was not suggesting adding auth to metrics - I think this needlessly complicates everything. But we need to assume that client developers will not have access to the broker metrics (because in secure environment they probably won't). Gwen On Tue, Apr 7, 2015 at 11:20 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Having auth on top of metrics is going to be lot more difficult. How are we going to restrict metrics reporter which run as part of kafka server they will have access to all the metrics and they can publish to ganglia etc.. I look at the metrics as a read-only info. As you said metrics for all the topics can be visible but what actions are we looking that can be non-secure based on metrics alone? . This probably can be part of KIP-11 discussion. Having said that it will be great if the throttling details can be exposed as part of the response to the client. Instead of looking at metrics , client can depend on the response to slow down if its being throttled. This allows us the clients can be self-reliant based on the response . -- Harsha On April 7, 2015 at 9:55:41 AM, Gwen Shapira (gshap...@cloudera.com) wrote: Re (1): We have no authorization story on the metrics collected by brokers, so I assume that access to broker metrics means knowing exactly which topics exist and their throughputs. (Prath and Don, correct me if I got it wrong...) Secure environments will strictly control access
Re: [KIP-DISCUSSION] KIP-13 Quotas
Since we are also thinking about evolving the fetch request protocol in KAFKA-2063 (bound fetch response size), perhaps it's worth thinking through if we can just evolve the protocol once. Thanks, Jun On Wed, Apr 8, 2015 at 10:43 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Thanks for the detailed review. I've addressed your comments. For rejected alternatives, we've rejected per-partition distribution because we choose client based quotas where there is no notion of partitions. I've explained in a bit more detail in that section. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Wednesday, April 08, 2015 6:30 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for updating the wiki. Looks great overall. Just a couple more comments: Client status code: - v0 requests - current version (0) of those requests. - Fetch response has a throttled flag instead of throttle time - I think you intended the latter. - Can you make it clear that the quota status is a new field called throttleTimeMs (or equivalent). It would help if some of that is moved (or repeated) in compatibility/migration plan. - So you would need to upgrade brokers first, then the clients. While upgrading the brokers (via a rolling bounce) the brokers cannot start using the latest fetch-request version immediately (for replica fetches). Since there will be older brokers in the mix those brokers would not be able to read v1 fetch requests. So all the brokers should be upgraded before switching to the latest fetch request version. This is similar to what Gwen proposed in KIP-2/KAFKA-1809 and I think we will need to use the inter-broker protocol version config. Rejected alternatives-quota-distribution.B: notes that this is the most elegant model, but does not explain why it was rejected. I think this was because we would then need some sort of gossip between brokers since partitions are across the cluster. Can you confirm? Thanks, Joel On Wed, Apr 08, 2015 at 05:45:34AM +, Aditya Auradkar wrote: Hey everyone, Following up after today's hangout. After discussing the client side metrics piece internally, we've incorporated that section into the KIP. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Since there appears to be sufficient consensus, I'm going to start a voting thread. Thanks, Aditya From: Gwen Shapira [gshap...@cloudera.com] Sent: Tuesday, April 07, 2015 11:31 AM To: Sriharsha Chintalapani Cc: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Yeah, I was not suggesting adding auth to metrics - I think this needlessly complicates everything. But we need to assume that client developers will not have access to the broker metrics (because in secure environment they probably won't). Gwen On Tue, Apr 7, 2015 at 11:20 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Having auth on top of metrics is going to be lot more difficult. How are we going to restrict metrics reporter which run as part of kafka server they will have access to all the metrics and they can publish to ganglia etc.. I look at the metrics as a read-only info. As you said metrics for all the topics can be visible but what actions are we looking that can be non-secure based on metrics alone? . This probably can be part of KIP-11 discussion. Having said that it will be great if the throttling details can be exposed as part of the response to the client. Instead of looking at metrics , client can depend on the response to slow down if its being throttled. This allows us the clients can be self-reliant based on the response . -- Harsha On April 7, 2015 at 9:55:41 AM, Gwen Shapira (gshap...@cloudera.com) wrote: Re (1): We have no authorization story on the metrics collected by brokers, so I assume that access to broker metrics means knowing exactly which topics exist and their throughputs. (Prath and Don, correct me if I got it wrong...) Secure environments will strictly control access to this information, so I am pretty sure the client developers will not have access to server metrics at all. Gwen On Tue, Apr 7, 2015 at 7:41 AM, Jay Kreps jay.kr...@gmail.com wrote: Totally. But is that the only use? What I wanted to flesh out was whether the goal was: 1. Expose throttling in the client metrics 2. Enable programmatic response (i.e. stop sending stuff or something like that) I think I kind of understand (1) but let's get specific on the metric we would be adding and what exactly you would expose in a dashboard. For example if the goal is just monitoring do I really want a boolean flag for is_throttled or do I want to know how much I am
Re: [KIP-DISCUSSION] KIP-13 Quotas
Thanks for updating the wiki. Looks great overall. Just a couple more comments: Client status code: - v0 requests - current version (0) of those requests. - Fetch response has a throttled flag instead of throttle time - I think you intended the latter. - Can you make it clear that the quota status is a new field called throttleTimeMs (or equivalent). It would help if some of that is moved (or repeated) in compatibility/migration plan. - So you would need to upgrade brokers first, then the clients. While upgrading the brokers (via a rolling bounce) the brokers cannot start using the latest fetch-request version immediately (for replica fetches). Since there will be older brokers in the mix those brokers would not be able to read v1 fetch requests. So all the brokers should be upgraded before switching to the latest fetch request version. This is similar to what Gwen proposed in KIP-2/KAFKA-1809 and I think we will need to use the inter-broker protocol version config. Rejected alternatives-quota-distribution.B: notes that this is the most elegant model, but does not explain why it was rejected. I think this was because we would then need some sort of gossip between brokers since partitions are across the cluster. Can you confirm? Thanks, Joel On Wed, Apr 08, 2015 at 05:45:34AM +, Aditya Auradkar wrote: Hey everyone, Following up after today's hangout. After discussing the client side metrics piece internally, we've incorporated that section into the KIP. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Since there appears to be sufficient consensus, I'm going to start a voting thread. Thanks, Aditya From: Gwen Shapira [gshap...@cloudera.com] Sent: Tuesday, April 07, 2015 11:31 AM To: Sriharsha Chintalapani Cc: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Yeah, I was not suggesting adding auth to metrics - I think this needlessly complicates everything. But we need to assume that client developers will not have access to the broker metrics (because in secure environment they probably won't). Gwen On Tue, Apr 7, 2015 at 11:20 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Having auth on top of metrics is going to be lot more difficult. How are we going to restrict metrics reporter which run as part of kafka server they will have access to all the metrics and they can publish to ganglia etc.. I look at the metrics as a read-only info. As you said metrics for all the topics can be visible but what actions are we looking that can be non-secure based on metrics alone? . This probably can be part of KIP-11 discussion. Having said that it will be great if the throttling details can be exposed as part of the response to the client. Instead of looking at metrics , client can depend on the response to slow down if its being throttled. This allows us the clients can be self-reliant based on the response . -- Harsha On April 7, 2015 at 9:55:41 AM, Gwen Shapira (gshap...@cloudera.com) wrote: Re (1): We have no authorization story on the metrics collected by brokers, so I assume that access to broker metrics means knowing exactly which topics exist and their throughputs. (Prath and Don, correct me if I got it wrong...) Secure environments will strictly control access to this information, so I am pretty sure the client developers will not have access to server metrics at all. Gwen On Tue, Apr 7, 2015 at 7:41 AM, Jay Kreps jay.kr...@gmail.com wrote: Totally. But is that the only use? What I wanted to flesh out was whether the goal was: 1. Expose throttling in the client metrics 2. Enable programmatic response (i.e. stop sending stuff or something like that) I think I kind of understand (1) but let's get specific on the metric we would be adding and what exactly you would expose in a dashboard. For example if the goal is just monitoring do I really want a boolean flag for is_throttled or do I want to know how much I am being throttled (i.e. throttle_pct might indicate the percent of your request time that was due to throttling or something like that)? If I am 1% throttled that may be irrelevant but 99% throttled would be quite relevant? Not sure I agree, just throwing that out there... For (2) the prior discussion seemed to kind of allude to this but I can't really come up with a use case. Is there one? If it is just (1) I think the question is whether it really helps much to have the metric on the client vs the server. I suppose this is a bit environment specific. If you have a central metrics system it shouldn't make any difference, but if you don't I suppose it does. -Jay On Mon, Apr 6, 2015 at 7:57 PM, Gwen Shapira gshap...@cloudera.com wrote: Here's a wild guess: An app developer included a Kafka
RE: [KIP-DISCUSSION] KIP-13 Quotas
Thanks for the detailed review. I've addressed your comments. For rejected alternatives, we've rejected per-partition distribution because we choose client based quotas where there is no notion of partitions. I've explained in a bit more detail in that section. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Wednesday, April 08, 2015 6:30 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for updating the wiki. Looks great overall. Just a couple more comments: Client status code: - v0 requests - current version (0) of those requests. - Fetch response has a throttled flag instead of throttle time - I think you intended the latter. - Can you make it clear that the quota status is a new field called throttleTimeMs (or equivalent). It would help if some of that is moved (or repeated) in compatibility/migration plan. - So you would need to upgrade brokers first, then the clients. While upgrading the brokers (via a rolling bounce) the brokers cannot start using the latest fetch-request version immediately (for replica fetches). Since there will be older brokers in the mix those brokers would not be able to read v1 fetch requests. So all the brokers should be upgraded before switching to the latest fetch request version. This is similar to what Gwen proposed in KIP-2/KAFKA-1809 and I think we will need to use the inter-broker protocol version config. Rejected alternatives-quota-distribution.B: notes that this is the most elegant model, but does not explain why it was rejected. I think this was because we would then need some sort of gossip between brokers since partitions are across the cluster. Can you confirm? Thanks, Joel On Wed, Apr 08, 2015 at 05:45:34AM +, Aditya Auradkar wrote: Hey everyone, Following up after today's hangout. After discussing the client side metrics piece internally, we've incorporated that section into the KIP. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Since there appears to be sufficient consensus, I'm going to start a voting thread. Thanks, Aditya From: Gwen Shapira [gshap...@cloudera.com] Sent: Tuesday, April 07, 2015 11:31 AM To: Sriharsha Chintalapani Cc: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Yeah, I was not suggesting adding auth to metrics - I think this needlessly complicates everything. But we need to assume that client developers will not have access to the broker metrics (because in secure environment they probably won't). Gwen On Tue, Apr 7, 2015 at 11:20 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Having auth on top of metrics is going to be lot more difficult. How are we going to restrict metrics reporter which run as part of kafka server they will have access to all the metrics and they can publish to ganglia etc.. I look at the metrics as a read-only info. As you said metrics for all the topics can be visible but what actions are we looking that can be non-secure based on metrics alone? . This probably can be part of KIP-11 discussion. Having said that it will be great if the throttling details can be exposed as part of the response to the client. Instead of looking at metrics , client can depend on the response to slow down if its being throttled. This allows us the clients can be self-reliant based on the response . -- Harsha On April 7, 2015 at 9:55:41 AM, Gwen Shapira (gshap...@cloudera.com) wrote: Re (1): We have no authorization story on the metrics collected by brokers, so I assume that access to broker metrics means knowing exactly which topics exist and their throughputs. (Prath and Don, correct me if I got it wrong...) Secure environments will strictly control access to this information, so I am pretty sure the client developers will not have access to server metrics at all. Gwen On Tue, Apr 7, 2015 at 7:41 AM, Jay Kreps jay.kr...@gmail.com wrote: Totally. But is that the only use? What I wanted to flesh out was whether the goal was: 1. Expose throttling in the client metrics 2. Enable programmatic response (i.e. stop sending stuff or something like that) I think I kind of understand (1) but let's get specific on the metric we would be adding and what exactly you would expose in a dashboard. For example if the goal is just monitoring do I really want a boolean flag for is_throttled or do I want to know how much I am being throttled (i.e. throttle_pct might indicate the percent of your request time that was due to throttling or something like that)? If I am 1% throttled that may be irrelevant but 99% throttled would be quite relevant? Not sure I agree, just throwing that out there... For (2) the prior discussion seemed to kind of allude to this but I can't really come up with a use case
Re: [KIP-DISCUSSION] KIP-13 Quotas
see some response inline below. Tong Li OpenStack Kafka Community Development Building 501/B205 liton...@us.ibm.com Jay Kreps jay.kr...@gmail.com wrote on 04/07/2015 10:41:19 AM: From: Jay Kreps jay.kr...@gmail.com To: dev@kafka.apache.org dev@kafka.apache.org Date: 04/07/2015 10:44 AM Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Totally. But is that the only use? What I wanted to flesh out was whether the goal was: 1. Expose throttling in the client metrics 2. Enable programmatic response (i.e. stop sending stuff or something like that) I think I kind of understand (1) but let's get specific on the metric we would be adding and what exactly you would expose in a dashboard. For example if the goal is just monitoring do I really want a boolean flag for is_throttled or do I want to know how much I am being throttled (i.e. throttle_pct might indicate the percent of your request time that was due to throttling or something like that)? If I am 1% throttled that may be irrelevant but 99% throttled would be quite relevant? Not sure I agree, just throwing that out there... Jay, great point, I think Kafka should really just sent metrics, how to judge if a system is throttled should be someone other people's job. I would think this comes down to design principles, if we follow the principal of separation of the concerns, then this should not be really part of Kafka. I have been doing monitoring systems for awhile, the system being monitored normally just send the fact of itself, such as CPU usage, network usage, disk usage etc to the monitoring system, the monitoring system will run various algorithms to eventually decide if a system is throttled by setting up threshold and other measures. The monitoring system will also send out notifications/alarms if things turns bad. Just to make this discussion even easier, a set of general purpose of agents collecting these data have been developed and available as part of a monitoring system named Monasca. If you are interested, I can provide more information. For Kafka to have the features such as judging if the system is throttling seems to be a moving-away from its core values. Just my 2 cents of course. For (2) the prior discussion seemed to kind of allude to this but I can't really come up with a use case. Is there one? If it is just (1) I think the question is whether it really helps much to have the metric on the client vs the server. I suppose this is a bit environment specific. If you have a central metrics system it shouldn't make any difference, but if you don't I suppose it does. -Jay On Mon, Apr 6, 2015 at 7:57 PM, Gwen Shapira gshap...@cloudera.com wrote: Here's a wild guess: An app developer included a Kafka Producer in his app, and is not happy with the throughput. He doesn't have visibility into the brokers since they are owned by a different team. Obviously the first instinct of a developer who knows that throttling exists is to blame throttling for any slowdown in the app. If he doesn't have a way to know from the responses whether or not his app is throttled, he may end up calling Aditya at 4am asked Hey, is my app throttled?. I assume Aditya is trying to avoid this scenario. On Mon, Apr 6, 2015 at 7:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Aditya, 2. I kind of buy it, but I really like to understand the details of the use case before we make protocol changes. What changes are you proposing in the clients for monitoring and how would that be used? -Jay On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread
Re: [KIP-DISCUSSION] KIP-13 Quotas
Totally. But is that the only use? What I wanted to flesh out was whether the goal was: 1. Expose throttling in the client metrics 2. Enable programmatic response (i.e. stop sending stuff or something like that) I think I kind of understand (1) but let's get specific on the metric we would be adding and what exactly you would expose in a dashboard. For example if the goal is just monitoring do I really want a boolean flag for is_throttled or do I want to know how much I am being throttled (i.e. throttle_pct might indicate the percent of your request time that was due to throttling or something like that)? If I am 1% throttled that may be irrelevant but 99% throttled would be quite relevant? Not sure I agree, just throwing that out there... For (2) the prior discussion seemed to kind of allude to this but I can't really come up with a use case. Is there one? If it is just (1) I think the question is whether it really helps much to have the metric on the client vs the server. I suppose this is a bit environment specific. If you have a central metrics system it shouldn't make any difference, but if you don't I suppose it does. -Jay On Mon, Apr 6, 2015 at 7:57 PM, Gwen Shapira gshap...@cloudera.com wrote: Here's a wild guess: An app developer included a Kafka Producer in his app, and is not happy with the throughput. He doesn't have visibility into the brokers since they are owned by a different team. Obviously the first instinct of a developer who knows that throttling exists is to blame throttling for any slowdown in the app. If he doesn't have a way to know from the responses whether or not his app is throttled, he may end up calling Aditya at 4am asked Hey, is my app throttled?. I assume Aditya is trying to avoid this scenario. On Mon, Apr 6, 2015 at 7:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Aditya, 2. I kind of buy it, but I really like to understand the details of the use case before we make protocol changes. What changes are you proposing in the clients for monitoring and how would that be used? -Jay On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring on the server-side versus in the clients to justify putting it in the protocol. But I think you guys may have other use cases in mind of how a client would make some use of this? Let's work that out. I also don't feel strongly about it--it wouldn't be *bad* to have the monitoring available on the client, just doesn't seem that much better. 4. For the purgatory vs delay queue I think is arguably nicer to reuse the purgatory we just have to be ultra-conscious of efficiency. I think our goal is to turn quotas on across the board, so at LinkedIn
RE: [KIP-DISCUSSION] KIP-13 Quotas
Our only use case currently is to expose throttling in the client metrics. I think it's better to add more information than an isThrottled flag. Initially we were thinking of exposing the amount of time the request was delayed but the percentage throttling metric Jay mentioned also sounds good. Here are the exact metrics we could add using KM sensors. 1. Max - The maximum amount of time/percentage that a request was throttled during the last window. Users can configure alerting if this exceeds a certain threshold. 2. Avg - The average delay over each request sent during the window. If exposing delay as time, it corresponds to the existing request-latency-avg, request-latency-max metrics currently on the producer. (2) Enabling programmatic response is harder and we don't have a use-case for this. Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Tuesday, April 07, 2015 7:41 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Totally. But is that the only use? What I wanted to flesh out was whether the goal was: 1. Expose throttling in the client metrics 2. Enable programmatic response (i.e. stop sending stuff or something like that) I think I kind of understand (1) but let's get specific on the metric we would be adding and what exactly you would expose in a dashboard. For example if the goal is just monitoring do I really want a boolean flag for is_throttled or do I want to know how much I am being throttled (i.e. throttle_pct might indicate the percent of your request time that was due to throttling or something like that)? If I am 1% throttled that may be irrelevant but 99% throttled would be quite relevant? Not sure I agree, just throwing that out there... For (2) the prior discussion seemed to kind of allude to this but I can't really come up with a use case. Is there one? If it is just (1) I think the question is whether it really helps much to have the metric on the client vs the server. I suppose this is a bit environment specific. If you have a central metrics system it shouldn't make any difference, but if you don't I suppose it does. -Jay On Mon, Apr 6, 2015 at 7:57 PM, Gwen Shapira gshap...@cloudera.com wrote: Here's a wild guess: An app developer included a Kafka Producer in his app, and is not happy with the throughput. He doesn't have visibility into the brokers since they are owned by a different team. Obviously the first instinct of a developer who knows that throttling exists is to blame throttling for any slowdown in the app. If he doesn't have a way to know from the responses whether or not his app is throttled, he may end up calling Aditya at 4am asked Hey, is my app throttled?. I assume Aditya is trying to avoid this scenario. On Mon, Apr 6, 2015 at 7:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Aditya, 2. I kind of buy it, but I really like to understand the details of the use case before we make protocol changes. What changes are you proposing in the clients for monitoring and how would that be used? -Jay On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have
Re: [KIP-DISCUSSION] KIP-13 Quotas
Re (1): We have no authorization story on the metrics collected by brokers, so I assume that access to broker metrics means knowing exactly which topics exist and their throughputs. (Prath and Don, correct me if I got it wrong...) Secure environments will strictly control access to this information, so I am pretty sure the client developers will not have access to server metrics at all. Gwen On Tue, Apr 7, 2015 at 7:41 AM, Jay Kreps jay.kr...@gmail.com wrote: Totally. But is that the only use? What I wanted to flesh out was whether the goal was: 1. Expose throttling in the client metrics 2. Enable programmatic response (i.e. stop sending stuff or something like that) I think I kind of understand (1) but let's get specific on the metric we would be adding and what exactly you would expose in a dashboard. For example if the goal is just monitoring do I really want a boolean flag for is_throttled or do I want to know how much I am being throttled (i.e. throttle_pct might indicate the percent of your request time that was due to throttling or something like that)? If I am 1% throttled that may be irrelevant but 99% throttled would be quite relevant? Not sure I agree, just throwing that out there... For (2) the prior discussion seemed to kind of allude to this but I can't really come up with a use case. Is there one? If it is just (1) I think the question is whether it really helps much to have the metric on the client vs the server. I suppose this is a bit environment specific. If you have a central metrics system it shouldn't make any difference, but if you don't I suppose it does. -Jay On Mon, Apr 6, 2015 at 7:57 PM, Gwen Shapira gshap...@cloudera.com wrote: Here's a wild guess: An app developer included a Kafka Producer in his app, and is not happy with the throughput. He doesn't have visibility into the brokers since they are owned by a different team. Obviously the first instinct of a developer who knows that throttling exists is to blame throttling for any slowdown in the app. If he doesn't have a way to know from the responses whether or not his app is throttled, he may end up calling Aditya at 4am asked Hey, is my app throttled?. I assume Aditya is trying to avoid this scenario. On Mon, Apr 6, 2015 at 7:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Aditya, 2. I kind of buy it, but I really like to understand the details of the use case before we make protocol changes. What changes are you proposing in the clients for monitoring and how would that be used? -Jay On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring
Re: [KIP-DISCUSSION] KIP-13 Quotas
if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring on the server-side versus in the clients to justify putting it in the protocol. But I think you guys may have other use cases in mind of how a client would make some use of this? Let's work that out. I also don't feel strongly about it--it wouldn't be *bad* to have the monitoring available on the client, just doesn't seem that much better. 4. For the purgatory vs delay queue I think is arguably nicer to reuse the purgatory we just have to be ultra-conscious of efficiency. I think our goal is to turn quotas on across the board, so at LinkedIn that would mean potentially every request will need a small delay. I haven't worked out the efficiency implications of this choice, so as long as we do that I'm happy. -Jay On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Some responses to Jay's points. 1. Using commas - Cool. 2. Adding return flag - I'm inclined to agree with Joel that this is good to have in the initial implementation. 3. Config - +1. I'll remove it from the KIP. We can discuss this in parallel. 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the existing purgatories for both delayed produce and fetch requests. IIUC, all we need for quotas is a minWait parameter for DelayedOperation (or something equivalent) since there is already a max wait. The completion criteria can check if minWait time has elapsed before declaring the operation complete. For this to impact performance, a significant number of clients may need to exceed their quota at the same time and even then I'm not very clear on the scope of the impact. Two layers of delays might add complexity to the implementation which I'm hoping to avoid. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Friday, April 03, 2015 12:48 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Aditya, thanks for the updated KIP and Jay/Jun thanks for the comments. Couple of comments in-line: 2. I would advocate for adding the return flag when we next bump the request format version just to avoid proliferation. I agree this is a good thing to know about, but at the moment I don't think we have a very well flushed out idea of how the client would actually make use of this info. I I'm somewhat inclined to having something appropriate off the bat - mainly because (i) clients really should know that they have been throttled (ii) a smart producer/consumer implementation would want to know how much to back off. So perhaps this and config-management should be moved to a separate discussion, but it would be good to have this discussion going and incorporated into the first quota implementation. 3. Config--I think we need to generalize the topic stuff so we can override at multiple levels. We have topic and client, but I suspect user and broker will also be important. I recommend we take config stuff out of this KIP since we really need to fully think through a proposal that will cover all these types of overrides. +1 - it is definitely orthogonal to the core quota implementation (although
Re: [KIP-DISCUSSION] KIP-13 Quotas
of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring on the server-side versus in the clients to justify putting it in the protocol. But I think you guys may have other use cases in mind of how a client would make some use of this? Let's work that out. I also don't feel strongly about it--it wouldn't be *bad* to have the monitoring available on the client, just doesn't seem that much better. 4. For the purgatory vs delay queue I think is arguably nicer to reuse the purgatory we just have to be ultra-conscious of efficiency. I think our goal is to turn quotas on across the board, so at LinkedIn that would mean potentially every request will need a small delay. I haven't worked out the efficiency implications of this choice, so as long as we do that I'm happy. -Jay On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Some responses to Jay's points. 1. Using commas - Cool. 2. Adding return flag - I'm inclined to agree with Joel that this is good to have in the initial implementation. 3. Config - +1. I'll remove it from the KIP. We can discuss this in parallel. 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the existing purgatories for both delayed produce and fetch requests. IIUC, all we need for quotas is a minWait parameter for DelayedOperation (or something equivalent) since there is already a max wait. The completion criteria can check if minWait time has elapsed before declaring the operation complete. For this to impact performance, a significant number of clients may need to exceed their quota at the same time and even then I'm not very clear on the scope of the impact. Two layers of delays might add complexity to the implementation which I'm hoping to avoid. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Friday, April 03, 2015 12:48 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Aditya, thanks for the updated KIP and Jay/Jun thanks for the comments. Couple of comments in-line: 2. I would advocate for adding the return flag when we next bump the request format version just to avoid proliferation. I agree this is a good thing to know about, but at the moment I don't think we have a very well flushed out idea of how the client would actually make use of this info. I I'm somewhat inclined to having something appropriate off the bat - mainly because (i) clients really should know that they have been throttled (ii) a smart producer/consumer implementation would want to know how much to back off. So perhaps this and config-management should be moved to a separate discussion, but it would be good to have this discussion going and incorporated into the first quota implementation. 3. Config--I think we need to generalize the topic stuff so we can override at multiple levels. We have topic and client, but I suspect user and broker will also be important. I recommend we take config stuff out of this KIP since we really need to fully think through a proposal that will cover all these types of overrides. +1 - it is definitely orthogonal to the core quota implementation (although necessary for its operability). Having a config-related discussion in this KIP would only draw out the discussion and vote
Re: [KIP-DISCUSSION] KIP-13 Quotas
Gwen I feel, we should assume the metrics server is an external system and the access to the server should be managed by the security features provided by the system. This way, it would be the Kafka System Administrator responsibility to ensure the metrics system is properly firewall¹ed or access controlled. Also, we won¹t be dependent on the metrics system chosen by the user. I hope, this makes sense. Thanks Bosco On 4/7/15, 9:54 AM, Gwen Shapira gshap...@cloudera.com wrote: Re (1): We have no authorization story on the metrics collected by brokers, so I assume that access to broker metrics means knowing exactly which topics exist and their throughputs. (Prath and Don, correct me if I got it wrong...) Secure environments will strictly control access to this information, so I am pretty sure the client developers will not have access to server metrics at all. Gwen On Tue, Apr 7, 2015 at 7:41 AM, Jay Kreps jay.kr...@gmail.com wrote: Totally. But is that the only use? What I wanted to flesh out was whether the goal was: 1. Expose throttling in the client metrics 2. Enable programmatic response (i.e. stop sending stuff or something like that) I think I kind of understand (1) but let's get specific on the metric we would be adding and what exactly you would expose in a dashboard. For example if the goal is just monitoring do I really want a boolean flag for is_throttled or do I want to know how much I am being throttled (i.e. throttle_pct might indicate the percent of your request time that was due to throttling or something like that)? If I am 1% throttled that may be irrelevant but 99% throttled would be quite relevant? Not sure I agree, just throwing that out there... For (2) the prior discussion seemed to kind of allude to this but I can't really come up with a use case. Is there one? If it is just (1) I think the question is whether it really helps much to have the metric on the client vs the server. I suppose this is a bit environment specific. If you have a central metrics system it shouldn't make any difference, but if you don't I suppose it does. -Jay On Mon, Apr 6, 2015 at 7:57 PM, Gwen Shapira gshap...@cloudera.com wrote: Here's a wild guess: An app developer included a Kafka Producer in his app, and is not happy with the throughput. He doesn't have visibility into the brokers since they are owned by a different team. Obviously the first instinct of a developer who knows that throttling exists is to blame throttling for any slowdown in the app. If he doesn't have a way to know from the responses whether or not his app is throttled, he may end up calling Aditya at 4am asked Hey, is my app throttled?. I assume Aditya is trying to avoid this scenario. On Mon, Apr 6, 2015 at 7:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Aditya, 2. I kind of buy it, but I really like to understand the details of the use case before we make protocol changes. What changes are you proposing in the clients for monitoring and how would that be used? -Jay On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how
Re: [KIP-DISCUSSION] KIP-13 Quotas
On Tue, Apr 07, 2015 at 07:41:19AM -0700, Jay Kreps wrote: If it is just (1) I think the question is whether it really helps much to have the metric on the client vs the server. I suppose this is a bit environment specific. If you have a central metrics system it shouldn't make any difference, but if you don't I suppose it does. At LinkedIn for instance, we do have a central metrics platform and any user can poke those server-side metrics. In practice though, users are averse to look at dashboards for services they do not own directly (such as Kafka) and pay close attention to their specific clien-side metrics. This is the main reason we think it is important to add _some_ visibility on the client side on whether the client is being actively throttled or not. The second use - i.e., reacting appropriately is a less pressing reason but a valid one. i.e., a smart implementation can be given a hint on how it should space out its requests to avoid getting continuously throttled. An even smarter client could derive this with just an isThrottled flag, but can end up flapping between throttled/not-throttled while it tries to infer a suitable throughput from just the flag. So basically I think it is critical to have _some_ indicator on the client-side and I completely agree we should think through carefully what information we send back to the client. On Mon, Apr 6, 2015 at 7:57 PM, Gwen Shapira gshap...@cloudera.com wrote: Here's a wild guess: An app developer included a Kafka Producer in his app, and is not happy with the throughput. He doesn't have visibility into the brokers since they are owned by a different team. Obviously the first instinct of a developer who knows that throttling exists is to blame throttling for any slowdown in the app. If he doesn't have a way to know from the responses whether or not his app is throttled, he may end up calling Aditya at 4am asked Hey, is my app throttled?. I assume Aditya is trying to avoid this scenario. On Mon, Apr 6, 2015 at 7:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Aditya, 2. I kind of buy it, but I really like to understand the details of the use case before we make protocol changes. What changes are you proposing in the clients for monitoring and how would that be used? -Jay On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring on the server-side versus in the clients to justify putting it in the protocol. But I think you guys may have other use cases in mind of how a client would make some use of this? Let's work that out. I also don't feel strongly about it--it wouldn't
RE: [KIP-DISCUSSION] KIP-13 Quotas
Hey everyone, Following up after today's hangout. After discussing the client side metrics piece internally, we've incorporated that section into the KIP. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Since there appears to be sufficient consensus, I'm going to start a voting thread. Thanks, Aditya From: Gwen Shapira [gshap...@cloudera.com] Sent: Tuesday, April 07, 2015 11:31 AM To: Sriharsha Chintalapani Cc: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Yeah, I was not suggesting adding auth to metrics - I think this needlessly complicates everything. But we need to assume that client developers will not have access to the broker metrics (because in secure environment they probably won't). Gwen On Tue, Apr 7, 2015 at 11:20 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Having auth on top of metrics is going to be lot more difficult. How are we going to restrict metrics reporter which run as part of kafka server they will have access to all the metrics and they can publish to ganglia etc.. I look at the metrics as a read-only info. As you said metrics for all the topics can be visible but what actions are we looking that can be non-secure based on metrics alone? . This probably can be part of KIP-11 discussion. Having said that it will be great if the throttling details can be exposed as part of the response to the client. Instead of looking at metrics , client can depend on the response to slow down if its being throttled. This allows us the clients can be self-reliant based on the response . -- Harsha On April 7, 2015 at 9:55:41 AM, Gwen Shapira (gshap...@cloudera.com) wrote: Re (1): We have no authorization story on the metrics collected by brokers, so I assume that access to broker metrics means knowing exactly which topics exist and their throughputs. (Prath and Don, correct me if I got it wrong...) Secure environments will strictly control access to this information, so I am pretty sure the client developers will not have access to server metrics at all. Gwen On Tue, Apr 7, 2015 at 7:41 AM, Jay Kreps jay.kr...@gmail.com wrote: Totally. But is that the only use? What I wanted to flesh out was whether the goal was: 1. Expose throttling in the client metrics 2. Enable programmatic response (i.e. stop sending stuff or something like that) I think I kind of understand (1) but let's get specific on the metric we would be adding and what exactly you would expose in a dashboard. For example if the goal is just monitoring do I really want a boolean flag for is_throttled or do I want to know how much I am being throttled (i.e. throttle_pct might indicate the percent of your request time that was due to throttling or something like that)? If I am 1% throttled that may be irrelevant but 99% throttled would be quite relevant? Not sure I agree, just throwing that out there... For (2) the prior discussion seemed to kind of allude to this but I can't really come up with a use case. Is there one? If it is just (1) I think the question is whether it really helps much to have the metric on the client vs the server. I suppose this is a bit environment specific. If you have a central metrics system it shouldn't make any difference, but if you don't I suppose it does. -Jay On Mon, Apr 6, 2015 at 7:57 PM, Gwen Shapira gshap...@cloudera.com wrote: Here's a wild guess: An app developer included a Kafka Producer in his app, and is not happy with the throughput. He doesn't have visibility into the brokers since they are owned by a different team. Obviously the first instinct of a developer who knows that throttling exists is to blame throttling for any slowdown in the app. If he doesn't have a way to know from the responses whether or not his app is throttled, he may end up calling Aditya at 4am asked Hey, is my app throttled?. I assume Aditya is trying to avoid this scenario. On Mon, Apr 6, 2015 at 7:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Aditya, 2. I kind of buy it, but I really like to understand the details of the use case before we make protocol changes. What changes are you proposing in the clients for monitoring and how would that be used? -Jay On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster
RE: [KIP-DISCUSSION] KIP-13 Quotas
Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring on the server-side versus in the clients to justify putting it in the protocol. But I think you guys may have other use cases in mind of how a client would make some use of this? Let's work that out. I also don't feel strongly about it--it wouldn't be *bad* to have the monitoring available on the client, just doesn't seem that much better. 4. For the purgatory vs delay queue I think is arguably nicer to reuse the purgatory we just have to be ultra-conscious of efficiency. I think our goal is to turn quotas on across the board, so at LinkedIn that would mean potentially every request will need a small delay. I haven't worked out the efficiency implications of this choice, so as long as we do that I'm happy. -Jay On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Some responses to Jay's points. 1. Using commas - Cool. 2. Adding return flag - I'm inclined to agree with Joel that this is good to have in the initial implementation. 3. Config - +1. I'll remove it from the KIP. We can discuss this in parallel. 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the existing purgatories for both delayed produce and fetch requests. IIUC, all we need for quotas is a minWait parameter for DelayedOperation (or something equivalent) since there is already a max wait. The completion criteria can check if minWait time has elapsed before declaring the operation complete. For this to impact performance, a significant number of clients may need to exceed their quota at the same time and even then I'm not very clear on the scope of the impact. Two layers of delays might add complexity to the implementation which I'm hoping to avoid. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Friday, April 03, 2015 12:48 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Aditya, thanks for the updated KIP and Jay/Jun thanks for the comments. Couple of comments in-line: 2. I would advocate for adding the return flag when we next bump the request format version just to avoid proliferation. I agree this is a good thing to know about, but at the moment I don't think we have a very well flushed out idea of how the client would actually make use of this info. I I'm somewhat inclined to having something appropriate off the bat - mainly because (i) clients really should know that they have been throttled (ii) a smart producer/consumer implementation would want to know how much to back off. So perhaps this and config-management should be moved to a separate discussion, but it would be good to have this discussion going and incorporated into the first quota implementation. 3. Config--I think we need to generalize the topic stuff so we can override at multiple levels. We have topic and client, but I suspect user
Re: [KIP-DISCUSSION] KIP-13 Quotas
As for 4, if we are going to reuse the purgatory class are we going to just use the produce / fetch purgatory objects or we are going to create a new throttle purgatory object? If we go with the first option then I think Jun's concern is valid such that some produce / fetch requests will have many keys and hence calling watch() will end up adding the request on each one of the watch lists, and we have seen some issues before with this scenario. Guozhang On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring on the server-side versus in the clients to justify putting it in the protocol. But I think you guys may have other use cases in mind of how a client would make some use of this? Let's work that out. I also don't feel strongly about it--it wouldn't be *bad* to have the monitoring available on the client, just doesn't seem that much better. 4. For the purgatory vs delay queue I think is arguably nicer to reuse the purgatory we just have to be ultra-conscious of efficiency. I think our goal is to turn quotas on across the board, so at LinkedIn that would mean potentially every request will need a small delay. I haven't worked out the efficiency implications of this choice, so as long as we do that I'm happy. -Jay On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Some responses to Jay's points. 1. Using commas - Cool. 2. Adding return flag - I'm inclined to agree with Joel that this is good to have in the initial implementation. 3. Config - +1. I'll remove it from the KIP. We can discuss this in parallel. 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the existing purgatories for both delayed produce and fetch requests. IIUC, all we need for quotas is a minWait parameter for DelayedOperation (or something equivalent) since there is already a max wait. The completion criteria can check if minWait time has elapsed before declaring the operation complete. For this to impact performance, a significant number of clients may need to exceed their quota at the same time and even then I'm not very clear on the scope of the impact. Two layers of delays might add complexity to the implementation which I'm hoping to avoid. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Friday, April 03, 2015 12:48 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Aditya, thanks for the updated KIP and Jay/Jun thanks for the comments. Couple of comments in-line: 2. I would advocate for adding the return flag when we next bump the request format version just to avoid proliferation. I agree this is a good thing to know about, but at the moment I don't think we have a very well flushed out idea of how
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hey Aditya, 2. I kind of buy it, but I really like to understand the details of the use case before we make protocol changes. What changes are you proposing in the clients for monitoring and how would that be used? -Jay On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring on the server-side versus in the clients to justify putting it in the protocol. But I think you guys may have other use cases in mind of how a client would make some use of this? Let's work that out. I also don't feel strongly about it--it wouldn't be *bad* to have the monitoring available on the client, just doesn't seem that much better. 4. For the purgatory vs delay queue I think is arguably nicer to reuse the purgatory we just have to be ultra-conscious of efficiency. I think our goal is to turn quotas on across the board, so at LinkedIn that would mean potentially every request will need a small delay. I haven't worked out the efficiency implications of this choice, so as long as we do that I'm happy. -Jay On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Some responses to Jay's points. 1. Using commas - Cool. 2. Adding return flag - I'm inclined to agree with Joel that this is good to have in the initial implementation. 3. Config - +1. I'll remove it from the KIP. We can discuss this in parallel. 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the existing purgatories for both delayed produce and fetch requests. IIUC, all we need for quotas is a minWait parameter for DelayedOperation (or something equivalent) since there is already a max wait. The completion criteria can check if minWait time has elapsed before declaring the operation complete. For this to impact performance, a significant number of clients may need to exceed their quota at the same time and even then I'm not very clear on the scope of the impact. Two layers of delays might add complexity to the implementation which I'm hoping to avoid. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Friday, April 03, 2015 12:48 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Aditya, thanks for the updated KIP and Jay/Jun thanks for the comments. Couple of comments in-line: 2. I would advocate for adding the return flag when we next bump the request format version just to avoid proliferation. I agree this is a good thing to know about, but at the moment I don't think we have a very well flushed out idea of how the client would actually make use of this info. I I'm somewhat inclined to having something appropriate off the bat - mainly because (i) clients really should know that they have been throttled (ii) a smart producer/consumer
Re: [KIP-DISCUSSION] KIP-13 Quotas
Here's a wild guess: An app developer included a Kafka Producer in his app, and is not happy with the throughput. He doesn't have visibility into the brokers since they are owned by a different team. Obviously the first instinct of a developer who knows that throttling exists is to blame throttling for any slowdown in the app. If he doesn't have a way to know from the responses whether or not his app is throttled, he may end up calling Aditya at 4am asked Hey, is my app throttled?. I assume Aditya is trying to avoid this scenario. On Mon, Apr 6, 2015 at 7:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Aditya, 2. I kind of buy it, but I really like to understand the details of the use case before we make protocol changes. What changes are you proposing in the clients for monitoring and how would that be used? -Jay On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring on the server-side versus in the clients to justify putting it in the protocol. But I think you guys may have other use cases in mind of how a client would make some use of this? Let's work that out. I also don't feel strongly about it--it wouldn't be *bad* to have the monitoring available on the client, just doesn't seem that much better. 4. For the purgatory vs delay queue I think is arguably nicer to reuse the purgatory we just have to be ultra-conscious of efficiency. I think our goal is to turn quotas on across the board, so at LinkedIn that would mean potentially every request will need a small delay. I haven't worked out the efficiency implications of this choice, so as long as we do that I'm happy. -Jay On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Some responses to Jay's points. 1. Using commas - Cool. 2. Adding return flag - I'm inclined to agree with Joel that this is good to have in the initial implementation. 3. Config - +1. I'll remove it from the KIP. We can discuss this in parallel. 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the existing purgatories for both delayed produce and fetch requests. IIUC, all we need for quotas is a minWait parameter for DelayedOperation (or something equivalent) since there is already a max wait. The completion criteria can check if minWait time has elapsed before declaring the operation complete. For this to impact performance, a significant number of clients may need to exceed their quota at the same time and even then I'm not very clear on the scope of the impact. Two layers of delays might add complexity to the implementation which I'm hoping to avoid. Aditya From: Joel Koshy [jjkosh...@gmail.com
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring on the server-side versus in the clients to justify putting it in the protocol. But I think you guys may have other use cases in mind of how a client would make some use of this? Let's work that out. I also don't feel strongly about it--it wouldn't be *bad* to have the monitoring available on the client, just doesn't seem that much better. 4. For the purgatory vs delay queue I think is arguably nicer to reuse the purgatory we just have to be ultra-conscious of efficiency. I think our goal is to turn quotas on across the board, so at LinkedIn that would mean potentially every request will need a small delay. I haven't worked out the efficiency implications of this choice, so as long as we do that I'm happy. -Jay On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Some responses to Jay's points. 1. Using commas - Cool. 2. Adding return flag - I'm inclined to agree with Joel that this is good to have in the initial implementation. 3. Config - +1. I'll remove it from the KIP. We can discuss this in parallel. 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the existing purgatories for both delayed produce and fetch requests. IIUC, all we need for quotas is a minWait parameter for DelayedOperation (or something equivalent) since there is already a max wait. The completion criteria can check if minWait time has elapsed before declaring the operation complete. For this to impact performance, a significant number of clients may need to exceed their quota at the same time and even then I'm not very clear on the scope of the impact. Two layers of delays might add complexity to the implementation which I'm hoping to avoid. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Friday, April 03, 2015 12:48 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Aditya, thanks for the updated KIP and Jay/Jun thanks for the comments. Couple of comments in-line: 2. I would advocate for adding the return flag when we next bump the request format version just to avoid proliferation. I agree this is a good thing to know about, but at the moment I don't think we have a very well flushed out idea of how the client would actually make use of this info. I I'm somewhat inclined to having something appropriate off the bat - mainly because (i) clients really should know that they have been throttled (ii) a smart producer/consumer implementation would want to know how much to back off. So perhaps this and config-management should be moved to a separate discussion, but it would be good to have this discussion going and incorporated into the first quota implementation. 3. Config--I think we need to generalize the topic stuff so we can override at multiple levels. We have topic and client, but I suspect user and broker will also be important. I recommend we take config stuff out of this KIP since we really need to fully think through a proposal that will cover all these types of overrides. +1 - it is definitely orthogonal to the core quota implementation (although necessary for its operability). Having a config-related discussion in this KIP would only draw out the discussion and vote even if the core quota design looks good to everyone. So basically I think we can remove the portions on dynamic config as well as the response format but I really think we should close on those while the implementation is in progress and before quotas is officially released. 4. Instead of using purgatories to implement the delay would it make more sense to just use a delay queue? I think all the additional stuff in the purgatory other than the delay queue doesn't make sense as the quota is a hard N ms penalty with no chance of early eviction. If there is no perf penalty for the full purgatory that may be fine (even good) to reuse, but I haven't looked into that. A simple delay queue sounds good - I think Aditya was also trying to avoid adding a new quota purgatory. i.e., it may be possible to use the existing purgatory instances to enforce quotas. That may be simpler, but would be incur a slight perf penalty if too many clients are being throttled. Thanks, Joel -Jay On Fri, Apr 3, 2015 at 10:45 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Update, I added
Re: [KIP-DISCUSSION] KIP-13 Quotas
Thanks for the update. 10. About whether to return a new field in the response to indicate throttling. Earlier, the plan was to not change the response format and just have a metric on the broker to indicate whether a clientId is throttled or not. The issue is that we don't know whether a particular clientId instance is throttled or not (since there could be multiple clients with the same clientId). Your proposal of adding an isThrottled field in the response addresses and seems better. Then, do we just throttle the new version of produce/fetch request or both the old and the new versions? Also, we probably still need a separate metric on the broker side to indicate whether a clientId is throttled or not. 11. Just to clarify. For fetch requests, when will metric.record(fetchSize) be called? Is it when we are ready to send the fetch response (after minBytes and maxWait are satisfied)? As an implementation detail, it may be useful for the quota code to return an estimated delay time (to bring the measurement within the limit) in QuotaViolationException. Thanks, Jun On Wed, Apr 1, 2015 at 3:27 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey everyone, I've made changes to the KIP to capture our discussions over the last couple of weeks. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas I'll start a voting thread after people have had a chance to read/comment. Thanks, Aditya From: Steven Wu [stevenz...@gmail.com] Sent: Friday, March 20, 2015 9:14 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas +1 on Jun's suggestion of maintaining one set/style of metrics at broker. In Netflix, we have to convert the yammer metrics to servo metrics at broker. it will be painful to know some metrics are in a different style and get to be handled differently. On Fri, Mar 20, 2015 at 8:17 AM, Jun Rao j...@confluent.io wrote: Not so sure. People who use quota will definitely want to monitor the new metrics at the client id level. Then they will need to deal with those metrics differently from the rest of the metrics. It would be better if we can hide this complexity from the users. Thanks, Jun On Thu, Mar 19, 2015 at 10:45 PM, Joel Koshy jjkosh...@gmail.com wrote: Actually thinking again - since these will be a few new metrics at the client id level (bytes in and bytes out to start with) maybe it is fine to have the two type of metrics coexist and we can migrate the existing metrics in parallel. On Thursday, March 19, 2015, Joel Koshy jjkosh...@gmail.com wrote: That is a valid concern but in that case I think it would be better to just migrate completely to the new metrics package first. On Thursday, March 19, 2015, Jun Rao j...@confluent.io javascript:_e(%7B%7D,'cvml','j...@confluent.io'); wrote: Hmm, I was thinking a bit differently on the metrics stuff. I think it would be confusing to have some metrics defined in the new metrics package while some others defined in Coda Hale. Those metrics will look different (e.g., rates in Coda Hale will have special attributes such as 1-min-average). People may need different ways to export the metrics to external systems such as Graphite. So, instead of using the new metrics package on the broker, I was thinking that we can just implement a QuotaMetrics that wraps the Coda Hale metrics. The implementation can be the same as what's in the new metrics package. Thanks, Jun On Thu, Mar 19, 2015 at 8:09 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I was saying was that we are blocked on picking an approach for metrics but not necessarily the full conversion. Clearly if we pick the new metrics package we would need to implement the two metrics we want to quota on. But the conversion of the remaining metrics can be done asynchronously. -Jay On Thu, Mar 19, 2015 at 5:56 PM, Joel Koshy jjkosh...@gmail.com wrote: in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Can you clarify the above? i.e., if we are going to quota on something then we would want to have migrated that metric over right? Or do you mean we don't need to complete the migration of all metrics to the metrics package right? I think most of us now feel that the delay + no error is a good approach, but it would be good to make sure everyone is on the same page. As Aditya requested a couple of days ago I think we should go over this at the next KIP hangout. Joel On Thu, Mar 19, 2015 at 09:24:09AM -0700, Jun Rao wrote: 1. Delay + no error seems reasonable to me. However, I do feel that we need
RE: [KIP-DISCUSSION] KIP-13 Quotas
Update, I added a proposal on doing dynamic client based configuration that can be used for quotas. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Please take a look and let me know if there are any concerns. Thanks, Aditya From: Aditya Auradkar Sent: Friday, April 03, 2015 10:10 AM To: dev@kafka.apache.org Subject: RE: [KIP-DISCUSSION] KIP-13 Quotas Thanks Jun. Some thoughts: 10) I think it is better we throttle regardless of the produce/fetch version. This is a nice feature where clients can tell if they are being throttled or not. If we only throttle newer clients, then we have inconsistent behavior across clients in a multi-tenant cluster. Having quota metrics on the client side is also a nice incentive to upgrade client versions. 11) I think we can call metric.record(fetchSize) before adding the delayedFetch request into the purgatory. This will give us the estimated delay of the request up-front. The timeout on the DelayedFetch is the Max(maxWait, quotaDelay). The DelayedFetch completion criteria can change a little to accomodate quotas. - I agree the quota code should return the estimated delay time in QuotaViolationException. Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Friday, April 03, 2015 9:16 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for the update. 10. About whether to return a new field in the response to indicate throttling. Earlier, the plan was to not change the response format and just have a metric on the broker to indicate whether a clientId is throttled or not. The issue is that we don't know whether a particular clientId instance is throttled or not (since there could be multiple clients with the same clientId). Your proposal of adding an isThrottled field in the response addresses and seems better. Then, do we just throttle the new version of produce/fetch request or both the old and the new versions? Also, we probably still need a separate metric on the broker side to indicate whether a clientId is throttled or not. 11. Just to clarify. For fetch requests, when will metric.record(fetchSize) be called? Is it when we are ready to send the fetch response (after minBytes and maxWait are satisfied)? As an implementation detail, it may be useful for the quota code to return an estimated delay time (to bring the measurement within the limit) in QuotaViolationException. Thanks, Jun On Wed, Apr 1, 2015 at 3:27 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey everyone, I've made changes to the KIP to capture our discussions over the last couple of weeks. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas I'll start a voting thread after people have had a chance to read/comment. Thanks, Aditya From: Steven Wu [stevenz...@gmail.com] Sent: Friday, March 20, 2015 9:14 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas +1 on Jun's suggestion of maintaining one set/style of metrics at broker. In Netflix, we have to convert the yammer metrics to servo metrics at broker. it will be painful to know some metrics are in a different style and get to be handled differently. On Fri, Mar 20, 2015 at 8:17 AM, Jun Rao j...@confluent.io wrote: Not so sure. People who use quota will definitely want to monitor the new metrics at the client id level. Then they will need to deal with those metrics differently from the rest of the metrics. It would be better if we can hide this complexity from the users. Thanks, Jun On Thu, Mar 19, 2015 at 10:45 PM, Joel Koshy jjkosh...@gmail.com wrote: Actually thinking again - since these will be a few new metrics at the client id level (bytes in and bytes out to start with) maybe it is fine to have the two type of metrics coexist and we can migrate the existing metrics in parallel. On Thursday, March 19, 2015, Joel Koshy jjkosh...@gmail.com wrote: That is a valid concern but in that case I think it would be better to just migrate completely to the new metrics package first. On Thursday, March 19, 2015, Jun Rao j...@confluent.io javascript:_e(%7B%7D,'cvml','j...@confluent.io'); wrote: Hmm, I was thinking a bit differently on the metrics stuff. I think it would be confusing to have some metrics defined in the new metrics package while some others defined in Coda Hale. Those metrics will look different (e.g., rates in Coda Hale will have special attributes such as 1-min-average). People may need different ways to export the metrics to external systems such as Graphite. So, instead of using the new metrics package on the broker, I was thinking that we can just implement a QuotaMetrics that wraps the Coda Hale metrics. The implementation can be the same as what's in the new
Re: [KIP-DISCUSSION] KIP-13 Quotas
This is fantastic. A couple of minor things: 1. I think we use commas rather than semicolons for list item separators in config 2. I would advocate for adding the return flag when we next bump the request format version just to avoid proliferation. I agree this is a good thing to know about, but at the moment I don't think we have a very well flushed out idea of how the client would actually make use of this info. I think we tend to have a bunch of these minor tweaks and it would probably be a lot less confusing to people if we batched them up and added them all at once. Another one that is needed, for example, is the per-fetch request memory limit. 3. Config--I think we need to generalize the topic stuff so we can override at multiple levels. We have topic and client, but I suspect user and broker will also be important. I recommend we take config stuff out of this KIP since we really need to fully think through a proposal that will cover all these types of overrides. 4. Instead of using purgatories to implement the delay would it make more sense to just use a delay queue? I think all the additional stuff in the purgatory other than the delay queue doesn't make sense as the quota is a hard N ms penalty with no chance of early eviction. If there is no perf penalty for the full purgatory that may be fine (even good) to reuse, but I haven't looked into that. -Jay On Fri, Apr 3, 2015 at 10:45 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Update, I added a proposal on doing dynamic client based configuration that can be used for quotas. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Please take a look and let me know if there are any concerns. Thanks, Aditya From: Aditya Auradkar Sent: Friday, April 03, 2015 10:10 AM To: dev@kafka.apache.org Subject: RE: [KIP-DISCUSSION] KIP-13 Quotas Thanks Jun. Some thoughts: 10) I think it is better we throttle regardless of the produce/fetch version. This is a nice feature where clients can tell if they are being throttled or not. If we only throttle newer clients, then we have inconsistent behavior across clients in a multi-tenant cluster. Having quota metrics on the client side is also a nice incentive to upgrade client versions. 11) I think we can call metric.record(fetchSize) before adding the delayedFetch request into the purgatory. This will give us the estimated delay of the request up-front. The timeout on the DelayedFetch is the Max(maxWait, quotaDelay). The DelayedFetch completion criteria can change a little to accomodate quotas. - I agree the quota code should return the estimated delay time in QuotaViolationException. Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Friday, April 03, 2015 9:16 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for the update. 10. About whether to return a new field in the response to indicate throttling. Earlier, the plan was to not change the response format and just have a metric on the broker to indicate whether a clientId is throttled or not. The issue is that we don't know whether a particular clientId instance is throttled or not (since there could be multiple clients with the same clientId). Your proposal of adding an isThrottled field in the response addresses and seems better. Then, do we just throttle the new version of produce/fetch request or both the old and the new versions? Also, we probably still need a separate metric on the broker side to indicate whether a clientId is throttled or not. 11. Just to clarify. For fetch requests, when will metric.record(fetchSize) be called? Is it when we are ready to send the fetch response (after minBytes and maxWait are satisfied)? As an implementation detail, it may be useful for the quota code to return an estimated delay time (to bring the measurement within the limit) in QuotaViolationException. Thanks, Jun On Wed, Apr 1, 2015 at 3:27 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey everyone, I've made changes to the KIP to capture our discussions over the last couple of weeks. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas I'll start a voting thread after people have had a chance to read/comment. Thanks, Aditya From: Steven Wu [stevenz...@gmail.com] Sent: Friday, March 20, 2015 9:14 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas +1 on Jun's suggestion of maintaining one set/style of metrics at broker. In Netflix, we have to convert the yammer metrics to servo metrics at broker. it will be painful to know some metrics are in a different style and get to be handled differently. On Fri, Mar 20, 2015 at 8:17 AM, Jun Rao j...@confluent.io wrote: Not so sure. People who use quota will definitely want
RE: [KIP-DISCUSSION] KIP-13 Quotas
Thanks Jun. Some thoughts: 10) I think it is better we throttle regardless of the produce/fetch version. This is a nice feature where clients can tell if they are being throttled or not. If we only throttle newer clients, then we have inconsistent behavior across clients in a multi-tenant cluster. Having quota metrics on the client side is also a nice incentive to upgrade client versions. 11) I think we can call metric.record(fetchSize) before adding the delayedFetch request into the purgatory. This will give us the estimated delay of the request up-front. The timeout on the DelayedFetch is the Max(maxWait, quotaDelay). The DelayedFetch completion criteria can change a little to accomodate quotas. - I agree the quota code should return the estimated delay time in QuotaViolationException. Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Friday, April 03, 2015 9:16 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for the update. 10. About whether to return a new field in the response to indicate throttling. Earlier, the plan was to not change the response format and just have a metric on the broker to indicate whether a clientId is throttled or not. The issue is that we don't know whether a particular clientId instance is throttled or not (since there could be multiple clients with the same clientId). Your proposal of adding an isThrottled field in the response addresses and seems better. Then, do we just throttle the new version of produce/fetch request or both the old and the new versions? Also, we probably still need a separate metric on the broker side to indicate whether a clientId is throttled or not. 11. Just to clarify. For fetch requests, when will metric.record(fetchSize) be called? Is it when we are ready to send the fetch response (after minBytes and maxWait are satisfied)? As an implementation detail, it may be useful for the quota code to return an estimated delay time (to bring the measurement within the limit) in QuotaViolationException. Thanks, Jun On Wed, Apr 1, 2015 at 3:27 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey everyone, I've made changes to the KIP to capture our discussions over the last couple of weeks. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas I'll start a voting thread after people have had a chance to read/comment. Thanks, Aditya From: Steven Wu [stevenz...@gmail.com] Sent: Friday, March 20, 2015 9:14 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas +1 on Jun's suggestion of maintaining one set/style of metrics at broker. In Netflix, we have to convert the yammer metrics to servo metrics at broker. it will be painful to know some metrics are in a different style and get to be handled differently. On Fri, Mar 20, 2015 at 8:17 AM, Jun Rao j...@confluent.io wrote: Not so sure. People who use quota will definitely want to monitor the new metrics at the client id level. Then they will need to deal with those metrics differently from the rest of the metrics. It would be better if we can hide this complexity from the users. Thanks, Jun On Thu, Mar 19, 2015 at 10:45 PM, Joel Koshy jjkosh...@gmail.com wrote: Actually thinking again - since these will be a few new metrics at the client id level (bytes in and bytes out to start with) maybe it is fine to have the two type of metrics coexist and we can migrate the existing metrics in parallel. On Thursday, March 19, 2015, Joel Koshy jjkosh...@gmail.com wrote: That is a valid concern but in that case I think it would be better to just migrate completely to the new metrics package first. On Thursday, March 19, 2015, Jun Rao j...@confluent.io javascript:_e(%7B%7D,'cvml','j...@confluent.io'); wrote: Hmm, I was thinking a bit differently on the metrics stuff. I think it would be confusing to have some metrics defined in the new metrics package while some others defined in Coda Hale. Those metrics will look different (e.g., rates in Coda Hale will have special attributes such as 1-min-average). People may need different ways to export the metrics to external systems such as Graphite. So, instead of using the new metrics package on the broker, I was thinking that we can just implement a QuotaMetrics that wraps the Coda Hale metrics. The implementation can be the same as what's in the new metrics package. Thanks, Jun On Thu, Mar 19, 2015 at 8:09 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I was saying was that we are blocked on picking an approach for metrics but not necessarily the full conversion. Clearly if we pick the new metrics package we would need to implement the two metrics we want to quota on. But the conversion
Re: [KIP-DISCUSSION] KIP-13 Quotas
Aditya, thanks for the updated KIP and Jay/Jun thanks for the comments. Couple of comments in-line: 2. I would advocate for adding the return flag when we next bump the request format version just to avoid proliferation. I agree this is a good thing to know about, but at the moment I don't think we have a very well flushed out idea of how the client would actually make use of this info. I I'm somewhat inclined to having something appropriate off the bat - mainly because (i) clients really should know that they have been throttled (ii) a smart producer/consumer implementation would want to know how much to back off. So perhaps this and config-management should be moved to a separate discussion, but it would be good to have this discussion going and incorporated into the first quota implementation. 3. Config--I think we need to generalize the topic stuff so we can override at multiple levels. We have topic and client, but I suspect user and broker will also be important. I recommend we take config stuff out of this KIP since we really need to fully think through a proposal that will cover all these types of overrides. +1 - it is definitely orthogonal to the core quota implementation (although necessary for its operability). Having a config-related discussion in this KIP would only draw out the discussion and vote even if the core quota design looks good to everyone. So basically I think we can remove the portions on dynamic config as well as the response format but I really think we should close on those while the implementation is in progress and before quotas is officially released. 4. Instead of using purgatories to implement the delay would it make more sense to just use a delay queue? I think all the additional stuff in the purgatory other than the delay queue doesn't make sense as the quota is a hard N ms penalty with no chance of early eviction. If there is no perf penalty for the full purgatory that may be fine (even good) to reuse, but I haven't looked into that. A simple delay queue sounds good - I think Aditya was also trying to avoid adding a new quota purgatory. i.e., it may be possible to use the existing purgatory instances to enforce quotas. That may be simpler, but would be incur a slight perf penalty if too many clients are being throttled. Thanks, Joel -Jay On Fri, Apr 3, 2015 at 10:45 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Update, I added a proposal on doing dynamic client based configuration that can be used for quotas. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Please take a look and let me know if there are any concerns. Thanks, Aditya From: Aditya Auradkar Sent: Friday, April 03, 2015 10:10 AM To: dev@kafka.apache.org Subject: RE: [KIP-DISCUSSION] KIP-13 Quotas Thanks Jun. Some thoughts: 10) I think it is better we throttle regardless of the produce/fetch version. This is a nice feature where clients can tell if they are being throttled or not. If we only throttle newer clients, then we have inconsistent behavior across clients in a multi-tenant cluster. Having quota metrics on the client side is also a nice incentive to upgrade client versions. 11) I think we can call metric.record(fetchSize) before adding the delayedFetch request into the purgatory. This will give us the estimated delay of the request up-front. The timeout on the DelayedFetch is the Max(maxWait, quotaDelay). The DelayedFetch completion criteria can change a little to accomodate quotas. - I agree the quota code should return the estimated delay time in QuotaViolationException. Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Friday, April 03, 2015 9:16 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks for the update. 10. About whether to return a new field in the response to indicate throttling. Earlier, the plan was to not change the response format and just have a metric on the broker to indicate whether a clientId is throttled or not. The issue is that we don't know whether a particular clientId instance is throttled or not (since there could be multiple clients with the same clientId). Your proposal of adding an isThrottled field in the response addresses and seems better. Then, do we just throttle the new version of produce/fetch request or both the old and the new versions? Also, we probably still need a separate metric on the broker side to indicate whether a clientId is throttled or not. 11. Just to clarify. For fetch requests, when will metric.record(fetchSize) be called? Is it when we are ready to send the fetch response (after minBytes and maxWait are satisfied)? As an implementation detail, it may be useful for the quota code to return an estimated delay time (to bring the measurement within the limit) in QuotaViolationException. Thanks, Jun On Wed
Re: [KIP-DISCUSSION] KIP-13 Quotas
11. We have to be careful not to double count when recording the metrics. Currently, metric.record(fetchSize) is called on completion of a fetch request. If we call metric.record(fetchSize) before the fetch request is added to purgatory, then we shouldn't call it again on completion. However, the bytes actually sent to the client may be different from what's recorded since new data may have accumulated since the recording time. 4. Adding a throttled request to Purgatory will add overhead if the request has many partitions. A delay queue would be much cheaper. A given request doesn't have to go through two layers of delays. Depending on whether a request is throttled or not, it either goes to Purgatory or delayed queue. Thanks, Jun On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Some responses to Jay's points. 1. Using commas - Cool. 2. Adding return flag - I'm inclined to agree with Joel that this is good to have in the initial implementation. 3. Config - +1. I'll remove it from the KIP. We can discuss this in parallel. 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the existing purgatories for both delayed produce and fetch requests. IIUC, all we need for quotas is a minWait parameter for DelayedOperation (or something equivalent) since there is already a max wait. The completion criteria can check if minWait time has elapsed before declaring the operation complete. For this to impact performance, a significant number of clients may need to exceed their quota at the same time and even then I'm not very clear on the scope of the impact. Two layers of delays might add complexity to the implementation which I'm hoping to avoid. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Friday, April 03, 2015 12:48 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Aditya, thanks for the updated KIP and Jay/Jun thanks for the comments. Couple of comments in-line: 2. I would advocate for adding the return flag when we next bump the request format version just to avoid proliferation. I agree this is a good thing to know about, but at the moment I don't think we have a very well flushed out idea of how the client would actually make use of this info. I I'm somewhat inclined to having something appropriate off the bat - mainly because (i) clients really should know that they have been throttled (ii) a smart producer/consumer implementation would want to know how much to back off. So perhaps this and config-management should be moved to a separate discussion, but it would be good to have this discussion going and incorporated into the first quota implementation. 3. Config--I think we need to generalize the topic stuff so we can override at multiple levels. We have topic and client, but I suspect user and broker will also be important. I recommend we take config stuff out of this KIP since we really need to fully think through a proposal that will cover all these types of overrides. +1 - it is definitely orthogonal to the core quota implementation (although necessary for its operability). Having a config-related discussion in this KIP would only draw out the discussion and vote even if the core quota design looks good to everyone. So basically I think we can remove the portions on dynamic config as well as the response format but I really think we should close on those while the implementation is in progress and before quotas is officially released. 4. Instead of using purgatories to implement the delay would it make more sense to just use a delay queue? I think all the additional stuff in the purgatory other than the delay queue doesn't make sense as the quota is a hard N ms penalty with no chance of early eviction. If there is no perf penalty for the full purgatory that may be fine (even good) to reuse, but I haven't looked into that. A simple delay queue sounds good - I think Aditya was also trying to avoid adding a new quota purgatory. i.e., it may be possible to use the existing purgatory instances to enforce quotas. That may be simpler, but would be incur a slight perf penalty if too many clients are being throttled. Thanks, Joel -Jay On Fri, Apr 3, 2015 at 10:45 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Update, I added a proposal on doing dynamic client based configuration that can be used for quotas. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Please take a look and let me know if there are any concerns. Thanks, Aditya From: Aditya Auradkar Sent: Friday, April 03, 2015 10:10 AM To: dev@kafka.apache.org Subject: RE: [KIP-DISCUSSION] KIP-13 Quotas Thanks Jun. Some thoughts: 10) I think it is better we throttle regardless of the produce/fetch version. This is a nice
RE: [KIP-DISCUSSION] KIP-13 Quotas
Some responses to Jay's points. 1. Using commas - Cool. 2. Adding return flag - I'm inclined to agree with Joel that this is good to have in the initial implementation. 3. Config - +1. I'll remove it from the KIP. We can discuss this in parallel. 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the existing purgatories for both delayed produce and fetch requests. IIUC, all we need for quotas is a minWait parameter for DelayedOperation (or something equivalent) since there is already a max wait. The completion criteria can check if minWait time has elapsed before declaring the operation complete. For this to impact performance, a significant number of clients may need to exceed their quota at the same time and even then I'm not very clear on the scope of the impact. Two layers of delays might add complexity to the implementation which I'm hoping to avoid. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Friday, April 03, 2015 12:48 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Aditya, thanks for the updated KIP and Jay/Jun thanks for the comments. Couple of comments in-line: 2. I would advocate for adding the return flag when we next bump the request format version just to avoid proliferation. I agree this is a good thing to know about, but at the moment I don't think we have a very well flushed out idea of how the client would actually make use of this info. I I'm somewhat inclined to having something appropriate off the bat - mainly because (i) clients really should know that they have been throttled (ii) a smart producer/consumer implementation would want to know how much to back off. So perhaps this and config-management should be moved to a separate discussion, but it would be good to have this discussion going and incorporated into the first quota implementation. 3. Config--I think we need to generalize the topic stuff so we can override at multiple levels. We have topic and client, but I suspect user and broker will also be important. I recommend we take config stuff out of this KIP since we really need to fully think through a proposal that will cover all these types of overrides. +1 - it is definitely orthogonal to the core quota implementation (although necessary for its operability). Having a config-related discussion in this KIP would only draw out the discussion and vote even if the core quota design looks good to everyone. So basically I think we can remove the portions on dynamic config as well as the response format but I really think we should close on those while the implementation is in progress and before quotas is officially released. 4. Instead of using purgatories to implement the delay would it make more sense to just use a delay queue? I think all the additional stuff in the purgatory other than the delay queue doesn't make sense as the quota is a hard N ms penalty with no chance of early eviction. If there is no perf penalty for the full purgatory that may be fine (even good) to reuse, but I haven't looked into that. A simple delay queue sounds good - I think Aditya was also trying to avoid adding a new quota purgatory. i.e., it may be possible to use the existing purgatory instances to enforce quotas. That may be simpler, but would be incur a slight perf penalty if too many clients are being throttled. Thanks, Joel -Jay On Fri, Apr 3, 2015 at 10:45 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Update, I added a proposal on doing dynamic client based configuration that can be used for quotas. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Please take a look and let me know if there are any concerns. Thanks, Aditya From: Aditya Auradkar Sent: Friday, April 03, 2015 10:10 AM To: dev@kafka.apache.org Subject: RE: [KIP-DISCUSSION] KIP-13 Quotas Thanks Jun. Some thoughts: 10) I think it is better we throttle regardless of the produce/fetch version. This is a nice feature where clients can tell if they are being throttled or not. If we only throttle newer clients, then we have inconsistent behavior across clients in a multi-tenant cluster. Having quota metrics on the client side is also a nice incentive to upgrade client versions. 11) I think we can call metric.record(fetchSize) before adding the delayedFetch request into the purgatory. This will give us the estimated delay of the request up-front. The timeout on the DelayedFetch is the Max(maxWait, quotaDelay). The DelayedFetch completion criteria can change a little to accomodate quotas. - I agree the quota code should return the estimated delay time in QuotaViolationException. Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Friday, April 03, 2015 9:16 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Thanks
RE: [KIP-DISCUSSION] KIP-13 Quotas
Hey everyone, I've made changes to the KIP to capture our discussions over the last couple of weeks. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas I'll start a voting thread after people have had a chance to read/comment. Thanks, Aditya From: Steven Wu [stevenz...@gmail.com] Sent: Friday, March 20, 2015 9:14 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas +1 on Jun's suggestion of maintaining one set/style of metrics at broker. In Netflix, we have to convert the yammer metrics to servo metrics at broker. it will be painful to know some metrics are in a different style and get to be handled differently. On Fri, Mar 20, 2015 at 8:17 AM, Jun Rao j...@confluent.io wrote: Not so sure. People who use quota will definitely want to monitor the new metrics at the client id level. Then they will need to deal with those metrics differently from the rest of the metrics. It would be better if we can hide this complexity from the users. Thanks, Jun On Thu, Mar 19, 2015 at 10:45 PM, Joel Koshy jjkosh...@gmail.com wrote: Actually thinking again - since these will be a few new metrics at the client id level (bytes in and bytes out to start with) maybe it is fine to have the two type of metrics coexist and we can migrate the existing metrics in parallel. On Thursday, March 19, 2015, Joel Koshy jjkosh...@gmail.com wrote: That is a valid concern but in that case I think it would be better to just migrate completely to the new metrics package first. On Thursday, March 19, 2015, Jun Rao j...@confluent.io javascript:_e(%7B%7D,'cvml','j...@confluent.io'); wrote: Hmm, I was thinking a bit differently on the metrics stuff. I think it would be confusing to have some metrics defined in the new metrics package while some others defined in Coda Hale. Those metrics will look different (e.g., rates in Coda Hale will have special attributes such as 1-min-average). People may need different ways to export the metrics to external systems such as Graphite. So, instead of using the new metrics package on the broker, I was thinking that we can just implement a QuotaMetrics that wraps the Coda Hale metrics. The implementation can be the same as what's in the new metrics package. Thanks, Jun On Thu, Mar 19, 2015 at 8:09 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I was saying was that we are blocked on picking an approach for metrics but not necessarily the full conversion. Clearly if we pick the new metrics package we would need to implement the two metrics we want to quota on. But the conversion of the remaining metrics can be done asynchronously. -Jay On Thu, Mar 19, 2015 at 5:56 PM, Joel Koshy jjkosh...@gmail.com wrote: in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Can you clarify the above? i.e., if we are going to quota on something then we would want to have migrated that metric over right? Or do you mean we don't need to complete the migration of all metrics to the metrics package right? I think most of us now feel that the delay + no error is a good approach, but it would be good to make sure everyone is on the same page. As Aditya requested a couple of days ago I think we should go over this at the next KIP hangout. Joel On Thu, Mar 19, 2015 at 09:24:09AM -0700, Jun Rao wrote: 1. Delay + no error seems reasonable to me. However, I do feel that we need to give the client an indicator that it's being throttled, instead of doing this silently. For that, we probably need to evolve the produce/fetch protocol to include an extra status field in the response. We probably need to think more about whether we just want to return a simple status code (e.g., 1 = throttled) or a value that indicates how much is being throttled. 2. We probably need to improve the histogram support in the new metrics package before we can use it more widely on the server side (left a comment in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Thanks, Jun On Wed, Mar 18, 2015 at 4:02 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey everyone, Thanks for the great discussion. There are currently a few points on this KIP that need addressing and I want to make sure we are on the same page about those. 1. Append and delay response vs delay and return error - I think we've discussed the pros and cons of each approach but haven't chosen an approach yet. Where does
Re: [KIP-DISCUSSION] KIP-13 Quotas
:10 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some
Re: [KIP-DISCUSSION] KIP-13 Quotas
separately. That will also contain a section on quotas. 3. Dynamic Configuration management - Being discussed in KIP-5. Basically we need something that will model default quotas and allow per-client overrides. Is there something else that I'm missing? Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Wednesday, March 18, 2015 2:10 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while
Re: [KIP-DISCUSSION] KIP-13 Quotas
in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Can you clarify the above? i.e., if we are going to quota on something then we would want to have migrated that metric over right? Or do you mean we don't need to complete the migration of all metrics to the metrics package right? I think most of us now feel that the delay + no error is a good approach, but it would be good to make sure everyone is on the same page. As Aditya requested a couple of days ago I think we should go over this at the next KIP hangout. Joel On Thu, Mar 19, 2015 at 09:24:09AM -0700, Jun Rao wrote: 1. Delay + no error seems reasonable to me. However, I do feel that we need to give the client an indicator that it's being throttled, instead of doing this silently. For that, we probably need to evolve the produce/fetch protocol to include an extra status field in the response. We probably need to think more about whether we just want to return a simple status code (e.g., 1 = throttled) or a value that indicates how much is being throttled. 2. We probably need to improve the histogram support in the new metrics package before we can use it more widely on the server side (left a comment in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Thanks, Jun On Wed, Mar 18, 2015 at 4:02 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey everyone, Thanks for the great discussion. There are currently a few points on this KIP that need addressing and I want to make sure we are on the same page about those. 1. Append and delay response vs delay and return error - I think we've discussed the pros and cons of each approach but haven't chosen an approach yet. Where does everyone stand on this issue? 2. Metrics Migration and usage in quotas - The metrics library in clients has a notion of quotas that we should reuse. For that to happen, we need to migrate the server to the new metrics package. - Need more clarification on how to compute throttling time and windowing for quotas. I'm going to start a new KIP to discuss metrics migration separately. That will also contain a section on quotas. 3. Dynamic Configuration management - Being discussed in KIP-5. Basically we need something that will model default quotas and allow per-client overrides. Is there something else that I'm missing? Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Wednesday, March 18, 2015 2:10 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me
Re: [KIP-DISCUSSION] KIP-13 Quotas
Actually thinking again - since these will be a few new metrics at the client id level (bytes in and bytes out to start with) maybe it is fine to have the two type of metrics coexist and we can migrate the existing metrics in parallel. On Thursday, March 19, 2015, Joel Koshy jjkosh...@gmail.com wrote: That is a valid concern but in that case I think it would be better to just migrate completely to the new metrics package first. On Thursday, March 19, 2015, Jun Rao j...@confluent.io javascript:_e(%7B%7D,'cvml','j...@confluent.io'); wrote: Hmm, I was thinking a bit differently on the metrics stuff. I think it would be confusing to have some metrics defined in the new metrics package while some others defined in Coda Hale. Those metrics will look different (e.g., rates in Coda Hale will have special attributes such as 1-min-average). People may need different ways to export the metrics to external systems such as Graphite. So, instead of using the new metrics package on the broker, I was thinking that we can just implement a QuotaMetrics that wraps the Coda Hale metrics. The implementation can be the same as what's in the new metrics package. Thanks, Jun On Thu, Mar 19, 2015 at 8:09 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I was saying was that we are blocked on picking an approach for metrics but not necessarily the full conversion. Clearly if we pick the new metrics package we would need to implement the two metrics we want to quota on. But the conversion of the remaining metrics can be done asynchronously. -Jay On Thu, Mar 19, 2015 at 5:56 PM, Joel Koshy jjkosh...@gmail.com wrote: in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Can you clarify the above? i.e., if we are going to quota on something then we would want to have migrated that metric over right? Or do you mean we don't need to complete the migration of all metrics to the metrics package right? I think most of us now feel that the delay + no error is a good approach, but it would be good to make sure everyone is on the same page. As Aditya requested a couple of days ago I think we should go over this at the next KIP hangout. Joel On Thu, Mar 19, 2015 at 09:24:09AM -0700, Jun Rao wrote: 1. Delay + no error seems reasonable to me. However, I do feel that we need to give the client an indicator that it's being throttled, instead of doing this silently. For that, we probably need to evolve the produce/fetch protocol to include an extra status field in the response. We probably need to think more about whether we just want to return a simple status code (e.g., 1 = throttled) or a value that indicates how much is being throttled. 2. We probably need to improve the histogram support in the new metrics package before we can use it more widely on the server side (left a comment in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Thanks, Jun On Wed, Mar 18, 2015 at 4:02 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey everyone, Thanks for the great discussion. There are currently a few points on this KIP that need addressing and I want to make sure we are on the same page about those. 1. Append and delay response vs delay and return error - I think we've discussed the pros and cons of each approach but haven't chosen an approach yet. Where does everyone stand on this issue? 2. Metrics Migration and usage in quotas - The metrics library in clients has a notion of quotas that we should reuse. For that to happen, we need to migrate the server to the new metrics package. - Need more clarification on how to compute throttling time and windowing for quotas. I'm going to start a new KIP to discuss metrics migration separately. That will also contain a section on quotas. 3. Dynamic Configuration management - Being discussed in KIP-5. Basically we need something that will model default quotas and allow per-client overrides. Is there something else that I'm missing? Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Wednesday, March 18, 2015 2:10 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either
Re: [KIP-DISCUSSION] KIP-13 Quotas
That is a valid concern but in that case I think it would be better to just migrate completely to the new metrics package first. On Thursday, March 19, 2015, Jun Rao j...@confluent.io wrote: Hmm, I was thinking a bit differently on the metrics stuff. I think it would be confusing to have some metrics defined in the new metrics package while some others defined in Coda Hale. Those metrics will look different (e.g., rates in Coda Hale will have special attributes such as 1-min-average). People may need different ways to export the metrics to external systems such as Graphite. So, instead of using the new metrics package on the broker, I was thinking that we can just implement a QuotaMetrics that wraps the Coda Hale metrics. The implementation can be the same as what's in the new metrics package. Thanks, Jun On Thu, Mar 19, 2015 at 8:09 PM, Jay Kreps jay.kr...@gmail.com javascript:; wrote: Yeah I was saying was that we are blocked on picking an approach for metrics but not necessarily the full conversion. Clearly if we pick the new metrics package we would need to implement the two metrics we want to quota on. But the conversion of the remaining metrics can be done asynchronously. -Jay On Thu, Mar 19, 2015 at 5:56 PM, Joel Koshy jjkosh...@gmail.com javascript:; wrote: in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Can you clarify the above? i.e., if we are going to quota on something then we would want to have migrated that metric over right? Or do you mean we don't need to complete the migration of all metrics to the metrics package right? I think most of us now feel that the delay + no error is a good approach, but it would be good to make sure everyone is on the same page. As Aditya requested a couple of days ago I think we should go over this at the next KIP hangout. Joel On Thu, Mar 19, 2015 at 09:24:09AM -0700, Jun Rao wrote: 1. Delay + no error seems reasonable to me. However, I do feel that we need to give the client an indicator that it's being throttled, instead of doing this silently. For that, we probably need to evolve the produce/fetch protocol to include an extra status field in the response. We probably need to think more about whether we just want to return a simple status code (e.g., 1 = throttled) or a value that indicates how much is being throttled. 2. We probably need to improve the histogram support in the new metrics package before we can use it more widely on the server side (left a comment in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Thanks, Jun On Wed, Mar 18, 2015 at 4:02 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey everyone, Thanks for the great discussion. There are currently a few points on this KIP that need addressing and I want to make sure we are on the same page about those. 1. Append and delay response vs delay and return error - I think we've discussed the pros and cons of each approach but haven't chosen an approach yet. Where does everyone stand on this issue? 2. Metrics Migration and usage in quotas - The metrics library in clients has a notion of quotas that we should reuse. For that to happen, we need to migrate the server to the new metrics package. - Need more clarification on how to compute throttling time and windowing for quotas. I'm going to start a new KIP to discuss metrics migration separately. That will also contain a section on quotas. 3. Dynamic Configuration management - Being discussed in KIP-5. Basically we need something that will model default quotas and allow per-client overrides. Is there something else that I'm missing? Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com javascript:;] Sent: Wednesday, March 18, 2015 2:10 PM To: dev@kafka.apache.org javascript:; Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com javascript:; wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug
Re: [KIP-DISCUSSION] KIP-13 Quotas
Yeah I was saying was that we are blocked on picking an approach for metrics but not necessarily the full conversion. Clearly if we pick the new metrics package we would need to implement the two metrics we want to quota on. But the conversion of the remaining metrics can be done asynchronously. -Jay On Thu, Mar 19, 2015 at 5:56 PM, Joel Koshy jjkosh...@gmail.com wrote: in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Can you clarify the above? i.e., if we are going to quota on something then we would want to have migrated that metric over right? Or do you mean we don't need to complete the migration of all metrics to the metrics package right? I think most of us now feel that the delay + no error is a good approach, but it would be good to make sure everyone is on the same page. As Aditya requested a couple of days ago I think we should go over this at the next KIP hangout. Joel On Thu, Mar 19, 2015 at 09:24:09AM -0700, Jun Rao wrote: 1. Delay + no error seems reasonable to me. However, I do feel that we need to give the client an indicator that it's being throttled, instead of doing this silently. For that, we probably need to evolve the produce/fetch protocol to include an extra status field in the response. We probably need to think more about whether we just want to return a simple status code (e.g., 1 = throttled) or a value that indicates how much is being throttled. 2. We probably need to improve the histogram support in the new metrics package before we can use it more widely on the server side (left a comment in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Thanks, Jun On Wed, Mar 18, 2015 at 4:02 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey everyone, Thanks for the great discussion. There are currently a few points on this KIP that need addressing and I want to make sure we are on the same page about those. 1. Append and delay response vs delay and return error - I think we've discussed the pros and cons of each approach but haven't chosen an approach yet. Where does everyone stand on this issue? 2. Metrics Migration and usage in quotas - The metrics library in clients has a notion of quotas that we should reuse. For that to happen, we need to migrate the server to the new metrics package. - Need more clarification on how to compute throttling time and windowing for quotas. I'm going to start a new KIP to discuss metrics migration separately. That will also contain a section on quotas. 3. Dynamic Configuration management - Being discussed in KIP-5. Basically we need something that will model default quotas and allow per-client overrides. Is there something else that I'm missing? Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Wednesday, March 18, 2015 2:10 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hmm, I was thinking a bit differently on the metrics stuff. I think it would be confusing to have some metrics defined in the new metrics package while some others defined in Coda Hale. Those metrics will look different (e.g., rates in Coda Hale will have special attributes such as 1-min-average). People may need different ways to export the metrics to external systems such as Graphite. So, instead of using the new metrics package on the broker, I was thinking that we can just implement a QuotaMetrics that wraps the Coda Hale metrics. The implementation can be the same as what's in the new metrics package. Thanks, Jun On Thu, Mar 19, 2015 at 8:09 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I was saying was that we are blocked on picking an approach for metrics but not necessarily the full conversion. Clearly if we pick the new metrics package we would need to implement the two metrics we want to quota on. But the conversion of the remaining metrics can be done asynchronously. -Jay On Thu, Mar 19, 2015 at 5:56 PM, Joel Koshy jjkosh...@gmail.com wrote: in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Can you clarify the above? i.e., if we are going to quota on something then we would want to have migrated that metric over right? Or do you mean we don't need to complete the migration of all metrics to the metrics package right? I think most of us now feel that the delay + no error is a good approach, but it would be good to make sure everyone is on the same page. As Aditya requested a couple of days ago I think we should go over this at the next KIP hangout. Joel On Thu, Mar 19, 2015 at 09:24:09AM -0700, Jun Rao wrote: 1. Delay + no error seems reasonable to me. However, I do feel that we need to give the client an indicator that it's being throttled, instead of doing this silently. For that, we probably need to evolve the produce/fetch protocol to include an extra status field in the response. We probably need to think more about whether we just want to return a simple status code (e.g., 1 = throttled) or a value that indicates how much is being throttled. 2. We probably need to improve the histogram support in the new metrics package before we can use it more widely on the server side (left a comment in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Thanks, Jun On Wed, Mar 18, 2015 at 4:02 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey everyone, Thanks for the great discussion. There are currently a few points on this KIP that need addressing and I want to make sure we are on the same page about those. 1. Append and delay response vs delay and return error - I think we've discussed the pros and cons of each approach but haven't chosen an approach yet. Where does everyone stand on this issue? 2. Metrics Migration and usage in quotas - The metrics library in clients has a notion of quotas that we should reuse. For that to happen, we need to migrate the server to the new metrics package. - Need more clarification on how to compute throttling time and windowing for quotas. I'm going to start a new KIP to discuss metrics migration separately. That will also contain a section on quotas. 3. Dynamic Configuration management - Being discussed in KIP-5. Basically we need something that will model default quotas and allow per-client overrides. Is there something else that I'm missing? Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Wednesday, March 18, 2015 2:10 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates
RE: [KIP-DISCUSSION] KIP-13 Quotas
Some thoughts about metrics in quotas. I took a look at the new metric package in clients/common and thought it was very nicely done. I especially liked using data points from current and previous windows. The Quota implementation is pretty general and I think we should use it in this quota proposal for the metrics we will add for quotas. Jay had mentioned that there was already some discussion previously about how to the new metrics library for quotas and I think this is quite similar. We can have bytes-in/bytes-out rate metrics per clientId configured with 'N' 1 second windows. I see that this is already supported in MetricConfig. Set the samples to 'n' (10?) and timeWindow to 1 second. This lets us precisely track the rate per second. A quota can also be configured when creating these metrics. This is simply the max value for this clientId which is either the default or overridden value as read from config when creating this Sensor. The code in KafkaApis or helper class can be: try { quota.record(numBytes, time.currentTimeMillis()); } catch(QuotaViolationException ex) { delayedOperation = DelayedOperation(ex.getDelayTime(), ...) quotaPurgatory.tryCompleteElseWatch(operation); } This assumes that the delay time is returned in the QuotaViolationException. The great thing about this is that we can catch any QuotaViolationException in the KafkaApis layer and it will have the delay time built into it. This lets us enforce quotas over any metric we track. Assume you are allocated 5MBps and the window is 10 seconds. and you are producing at exactly 5MBps. If you suddenly produce a 15MB batch in the next second, this causes your quota to get exceeded because you will have produced 60MB over your last 10 seconds (assuming for simplicity you produced exactly 45M in the last 9 seconds). The time to delay is (overall produced in window - quota bound)/Quota limit per second = time to delay (60 - 50)/5MBps quota = 2 second delay. Alternatively we can also compute this by doing sensor.checkQuotas(time) on some future time (in 1 second increments) and finding the first time unit when the check doesn't fail. Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Thursday, March 19, 2015 9:24 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas 1. Delay + no error seems reasonable to me. However, I do feel that we need to give the client an indicator that it's being throttled, instead of doing this silently. For that, we probably need to evolve the produce/fetch protocol to include an extra status field in the response. We probably need to think more about whether we just want to return a simple status code (e.g., 1 = throttled) or a value that indicates how much is being throttled. 2. We probably need to improve the histogram support in the new metrics package before we can use it more widely on the server side (left a comment in KAFKA-1930). I agree that this KIP doesn't need to block on the migration of the metrics package. Thanks, Jun On Wed, Mar 18, 2015 at 4:02 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey everyone, Thanks for the great discussion. There are currently a few points on this KIP that need addressing and I want to make sure we are on the same page about those. 1. Append and delay response vs delay and return error - I think we've discussed the pros and cons of each approach but haven't chosen an approach yet. Where does everyone stand on this issue? 2. Metrics Migration and usage in quotas - The metrics library in clients has a notion of quotas that we should reuse. For that to happen, we need to migrate the server to the new metrics package. - Need more clarification on how to compute throttling time and windowing for quotas. I'm going to start a new KIP to discuss metrics migration separately. That will also contain a section on quotas. 3. Dynamic Configuration management - Being discussed in KIP-5. Basically we need something that will model default quotas and allow per-client overrides. Is there something else that I'm missing? Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Wednesday, March 18, 2015 2:10 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send
RE: [KIP-DISCUSSION] KIP-13 Quotas
Is it possible these 3 options during the next KIP hangout? Aditya From: Steven Wu [stevenz...@gmail.com] Sent: Tuesday, March 17, 2015 10:08 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error
RE: [KIP-DISCUSSION] KIP-13 Quotas
Hey everyone, Thanks for the great discussion. There are currently a few points on this KIP that need addressing and I want to make sure we are on the same page about those. 1. Append and delay response vs delay and return error - I think we've discussed the pros and cons of each approach but haven't chosen an approach yet. Where does everyone stand on this issue? 2. Metrics Migration and usage in quotas - The metrics library in clients has a notion of quotas that we should reuse. For that to happen, we need to migrate the server to the new metrics package. - Need more clarification on how to compute throttling time and windowing for quotas. I'm going to start a new KIP to discuss metrics migration separately. That will also contain a section on quotas. 3. Dynamic Configuration management - Being discussed in KIP-5. Basically we need something that will model default quotas and allow per-client overrides. Is there something else that I'm missing? Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Wednesday, March 18, 2015 2:10 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment
Re: [KIP-DISCUSSION] KIP-13 Quotas
I think that is a good summary. 1. I'd favor delay over error--I think we have a design where delay will basically mimic the same behavior you would get if you had a lower capacity Kafka cluster all to yourself, which from my point of view is ideal. I'm aesthetically opposed to delay+error. 2. I actually don't think this is blocked on completing the metrics migration. I do think we need to figure out the direction, though to decide what to do. 3. Yup. -Jay On Wed, Mar 18, 2015 at 4:02 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey everyone, Thanks for the great discussion. There are currently a few points on this KIP that need addressing and I want to make sure we are on the same page about those. 1. Append and delay response vs delay and return error - I think we've discussed the pros and cons of each approach but haven't chosen an approach yet. Where does everyone stand on this issue? 2. Metrics Migration and usage in quotas - The metrics library in clients has a notion of quotas that we should reuse. For that to happen, we need to migrate the server to the new metrics package. - Need more clarification on how to compute throttling time and windowing for quotas. I'm going to start a new KIP to discuss metrics migration separately. That will also contain a section on quotas. 3. Dynamic Configuration management - Being discussed in KIP-5. Basically we need something that will model default quotas and allow per-client overrides. Is there something else that I'm missing? Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Wednesday, March 18, 2015 2:10 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example
Re: [KIP-DISCUSSION] KIP-13 Quotas
Ewen, 1. I think we are on the same page as per malicious clients, that it should not be the target of either approach. I was just trying to separate the discussion from what if user just keep retrying and maybe I was not clear. 2. I was not advocating option A on the wiki, in my previous email I actually assume that option is already dropped and we are only considering option B (which is my option b) in the email) and C (option a) in my email), and I think with some proper wrapping of status codes (today we still call them error codes) option B in the wiki may not necessarily require people who implement clients to handle each status code one-by-one. Guozhang On Tue, Mar 17, 2015 at 12:22 AM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven - that's a reasonable concern. I think I've mentioned the same sort of issue in the issues about the new producer's RecordAccumulator not timing out sends, e.g. in https://issues.apache.org/jira/browse/KAFKA-1788 . The shared buffer causes problems if one broker isn't available for awhile since messages to that broker end up consuming the entire buffer. You can end up with a similar problem here due to the effective rate limiting caused by delaying responses. Guozhang - I think only option A from the KIP is actually an error. If we want to look to HTTP for examples, there's an RFC that defines the Too Many Requests response to handle rate limiting: http://tools.ietf.org/html/rfc6585#page-3 In this case, it actually is an error, specifically a client error since its in the 400 range.The implication from the status code (429), name of the response, and the example given is that is is an error and no real data is returned, which would correspond to option A from the KIP. Note that the protocol provides a mechanism for giving extra (optional) information about when you should retry (via headers). I'd guess that even despite that, most systems that encounter a 429 use some ad hoc backoff mechanism because they only try to detect anything in the 400 range... One additional point -- I think malicious clients shouldn't be our target here, they can do a lot worse than what's been addressed in this thread. But I do agree that any proposal should have a clear explanation of how existing clients that are ignorant of quotas would behave (which is why options b and c make a lot of sense -- they rate limit without requiring an update to normally-behaving clients). On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More
Re: [KIP-DISCUSSION] KIP-13 Quotas
wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - And for implementation we can probably group the codes accordingly like HTTP status code such that we can do: boolean Error.isOK() { return code 300 code = 200; } Guozhang On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Agreed that trying to shoehorn non-error codes into the error field is a bad idea. It makes it *way* too easy to write code that looks (and should be) correct but is actually incorrect. If necessary, I think it's much better to to spend a couple of extra bytes to encode that information separately (a status or warning section of the response). An indication that throttling is occurring is something I'd expect to be indicated by a bit flag in the response rather than as an error code. Gwen - I think an error code makes sense when the request actually failed. Option B, which Jun was advocating, would have appended the messages successfully. If the rate-limiting case you're talking about had successfully committed the messages, I would say that's also a bad use of error codes. On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com wrote: We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful
Re: [KIP-DISCUSSION] KIP-13 Quotas
I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - And for implementation we can probably group the codes accordingly like HTTP status code such that we can do: boolean Error.isOK() { return code 300 code = 200; } Guozhang On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Agreed that trying to shoehorn non-error codes into the error field is a bad idea. It makes it *way* too easy to write code that looks (and should be) correct but is actually incorrect. If necessary, I think it's much better to to spend a couple of extra bytes to encode that information separately (a status or warning section of the response). An indication that throttling is occurring is something I'd expect to be indicated by a bit flag in the response rather than as an error code. Gwen - I think an error code makes sense when the request actually failed. Option B, which Jun was advocating, would have appended the messages successfully. If the rate-limiting case you're talking about had successfully committed the messages, I would say that's also a bad use of error codes. On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com wrote: We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps
Re: [KIP-DISCUSSION] KIP-13 Quotas
Steven - that's a reasonable concern. I think I've mentioned the same sort of issue in the issues about the new producer's RecordAccumulator not timing out sends, e.g. in https://issues.apache.org/jira/browse/KAFKA-1788. The shared buffer causes problems if one broker isn't available for awhile since messages to that broker end up consuming the entire buffer. You can end up with a similar problem here due to the effective rate limiting caused by delaying responses. Guozhang - I think only option A from the KIP is actually an error. If we want to look to HTTP for examples, there's an RFC that defines the Too Many Requests response to handle rate limiting: http://tools.ietf.org/html/rfc6585#page-3 In this case, it actually is an error, specifically a client error since its in the 400 range.The implication from the status code (429), name of the response, and the example given is that is is an error and no real data is returned, which would correspond to option A from the KIP. Note that the protocol provides a mechanism for giving extra (optional) information about when you should retry (via headers). I'd guess that even despite that, most systems that encounter a 429 use some ad hoc backoff mechanism because they only try to detect anything in the 400 range... One additional point -- I think malicious clients shouldn't be our target here, they can do a lot worse than what's been addressed in this thread. But I do agree that any proposal should have a clear explanation of how existing clients that are ignorant of quotas would behave (which is why options b and c make a lot of sense -- they rate limit without requiring an update to normally-behaving clients). On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error, etc: retry else
Re: [KIP-DISCUSSION] KIP-13 Quotas
please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - And for implementation we can probably group the codes accordingly like HTTP status code such that we can do: boolean Error.isOK() { return code 300 code = 200; } Guozhang On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Agreed that trying to shoehorn non-error codes into the error field is a bad idea. It makes it *way* too easy to write code that looks (and should be) correct but is actually incorrect. If necessary, I think it's much better to to spend a couple of extra bytes to encode that information separately (a status or warning section of the response). An indication that throttling is occurring is something I'd expect to be indicated by a bit flag in the response rather than as an error code. Gwen - I think an error code makes sense when the request actually failed. Option B, which Jun was advocating, would have appended the messages successfully. If the rate-limiting case you're talking about had successfully committed the messages, I would say that's also a bad use of error codes. On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com wrote: We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to
Re: [KIP-DISCUSSION] KIP-13 Quotas
Ewen, I see your point regarding the shared buffer. yes, a bad/slow broker could potentially consume up all buffer. On the other hand, I do like the batching behavior of shared RecordAccumulator buffer. On Tue, Mar 17, 2015 at 8:25 AM, Guozhang Wang wangg...@gmail.com wrote: Ewen, 1. I think we are on the same page as per malicious clients, that it should not be the target of either approach. I was just trying to separate the discussion from what if user just keep retrying and maybe I was not clear. 2. I was not advocating option A on the wiki, in my previous email I actually assume that option is already dropped and we are only considering option B (which is my option b) in the email) and C (option a) in my email), and I think with some proper wrapping of status codes (today we still call them error codes) option B in the wiki may not necessarily require people who implement clients to handle each status code one-by-one. Guozhang On Tue, Mar 17, 2015 at 12:22 AM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven - that's a reasonable concern. I think I've mentioned the same sort of issue in the issues about the new producer's RecordAccumulator not timing out sends, e.g. in https://issues.apache.org/jira/browse/KAFKA-1788 . The shared buffer causes problems if one broker isn't available for awhile since messages to that broker end up consuming the entire buffer. You can end up with a similar problem here due to the effective rate limiting caused by delaying responses. Guozhang - I think only option A from the KIP is actually an error. If we want to look to HTTP for examples, there's an RFC that defines the Too Many Requests response to handle rate limiting: http://tools.ietf.org/html/rfc6585#page-3 In this case, it actually is an error, specifically a client error since its in the 400 range.The implication from the status code (429), name of the response, and the example given is that is is an error and no real data is returned, which would correspond to option A from the KIP. Note that the protocol provides a mechanism for giving extra (optional) information about when you should retry (via headers). I'd guess that even despite that, most systems that encounter a 429 use some ad hoc backoff mechanism because they only try to detect anything in the 400 range... One additional point -- I think malicious clients shouldn't be our target here, they can do a lot worse than what's been addressed in this thread. But I do agree that any proposal should have a clear explanation of how existing clients that are ignorant of quotas would behave (which is why options b and c make a lot of sense -- they rate limit without requiring an update to normally-behaving clients). On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this
Re: [KIP-DISCUSSION] KIP-13 Quotas
Yes I think there are two policy issues but I don't think they are separate/mutually exclusive for the purposes of this discussion - the first being what should the broker do if quota is violated and second, what should it return to the client (error code or status code). (The separate discussion unrelated to quotas is in general do we want to include status code with errors - e.g., ReplicaNotAvailable in topic metadata response - and I think everyone agrees that we should not.) When we first started this thread I was leaning more toward error code and immediate response (so the client can react accordingly), but after this discussion I'm no longer convinced about that. (The error code is appropriate I think in this case since the request is actually dropped due to a quota violation.) An issue with this is that the broker cannot effectively protect itself against simple clients that don't back off properly. I actually think this may not be a huge issue though because regardless of quotas there needs to be lower-level protection against DoS - i.e., this applies even for the hold and respond approach. Something other than a Kafka client can flood a network. If the policy is to reject the request and respond immediately (or wait up to current request timeout) on quota violation then an error code is appropriate (since the append was rejected). With the second approach, (for producer request do an append, hold request and then respond), an error code does not really make sense. The main concern here is request timeout. I agree with Jay that if we improve the semantics of timeout (possibly adding a separate request timeout) then this approach would be less controversial. i.e., for producer requests there should be two timeouts - replication timeout and request timeout, the latter being very large. One nuance to this is I think it should be a broker-side setting (not client-side) that needs to be communicated to the client somehow since the client needs to know in advance a ceiling on how long it can expect to wait for a response. So if the request succeeds immediately or fails due to a usual error (e.g., slow replica and therefore replication timeout) the client will get a response within the replication timeout. Otherwise, it may block until the full request timeout if quota is violated. Both approaches ideally need some negotiation - in the first approach, the client should ideally be told its current quota from which it can estimate how long it should ideally back off. In the second approach, the client needs to know how long a request may be held and the broker enforce backoff up to this limit on quota violations. The latter seems simpler for client implementation. Thanks, Joel On Mon, Mar 16, 2015 at 10:58:02PM -0700, Guozhang Wang wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc:
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - And for implementation we can probably group the codes accordingly like HTTP status code such that we can do: boolean Error.isOK() { return code 300 code = 200; } Guozhang On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Agreed that trying to shoehorn non-error codes into the error field is a bad idea. It makes it *way* too easy to write code that looks (and should be) correct but is actually incorrect. If necessary, I think it's much better to to spend a couple of extra bytes to encode that information separately (a status or warning section of the response). An indication that throttling is occurring is something I'd expect to be indicated by a bit flag in the response rather than as an error code. Gwen - I think an error code makes sense when the request actually failed. Option B, which Jun was advocating, would have appended the
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hey Guozhang, Cool, I think we are mostly on the same page. I think I agree with what you are saying, the error code thing is basically a matter of taste. It would be possible to put all kinds of things as error codes and it would be possible to devise a scheme for clients to deal with it--for example Gwen's reserved error ranges. As a matter of taste I really do think a protocol with a clear definition of error as could not do what you asked, here is the reason is preferable to this. Basically what I am advocating is (1) reversing the existing case where we used an error code to encode side information, and (2) not doing that any more and instead using dedicated fields in the response, and (3) adding a clear definition of error to the protocol definition that formalizes this for client developers. I think I am particularly sensitive on this point because I spent a lot of time on clients (as you did) and error handling was really like 50% of the effort. I also just think that having a clear, well-designed, tasteful protocol is an asset in its own right. -Jay On Tue, Mar 17, 2015 at 8:25 AM, Guozhang Wang wangg...@gmail.com wrote: Ewen, 1. I think we are on the same page as per malicious clients, that it should not be the target of either approach. I was just trying to separate the discussion from what if user just keep retrying and maybe I was not clear. 2. I was not advocating option A on the wiki, in my previous email I actually assume that option is already dropped and we are only considering option B (which is my option b) in the email) and C (option a) in my email), and I think with some proper wrapping of status codes (today we still call them error codes) option B in the wiki may not necessarily require people who implement clients to handle each status code one-by-one. Guozhang On Tue, Mar 17, 2015 at 12:22 AM, Ewen Cheslack-Postava e...@confluent.io wrote: Steven - that's a reasonable concern. I think I've mentioned the same sort of issue in the issues about the new producer's RecordAccumulator not timing out sends, e.g. in https://issues.apache.org/jira/browse/KAFKA-1788 . The shared buffer causes problems if one broker isn't available for awhile since messages to that broker end up consuming the entire buffer. You can end up with a similar problem here due to the effective rate limiting caused by delaying responses. Guozhang - I think only option A from the KIP is actually an error. If we want to look to HTTP for examples, there's an RFC that defines the Too Many Requests response to handle rate limiting: http://tools.ietf.org/html/rfc6585#page-3 In this case, it actually is an error, specifically a client error since its in the 400 range.The implication from the status code (429), name of the response, and the example given is that is is an error and no real data is returned, which would correspond to option A from the KIP. Note that the protocol provides a mechanism for giving extra (optional) information about when you should retry (via headers). I'd guess that even despite that, most systems that encounter a 429 use some ad hoc backoff mechanism because they only try to detect anything in the 400 range... One additional point -- I think malicious clients shouldn't be our target here, they can do a lot worse than what's been addressed in this thread. But I do agree that any proposal should have a clear explanation of how existing clients that are ignorant of quotas would behave (which is why options b and c make a lot of sense -- they rate limit without requiring an update to normally-behaving clients). On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b)
Re: [KIP-DISCUSSION] KIP-13 Quotas
Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - And for implementation we can probably group the codes accordingly like HTTP status code such that we can do:
RE: [KIP-DISCUSSION] KIP-13 Quotas
In Jay's approach, a client will simply experience a delay in receiving a response. The primary benefit is that there are no concerns regarding data-loss because the data has already been appended. Retries are also a non-issue since there is no need for them. However, the drawback to append and delay is that if the socket timeout is reached (30 second default I believe), the client can disconnect and try to resend the batch to the server. This will cause data duplication since the server cannot distinguish duplicate batches. However, it is very likely that the maximum quota delay will be lower than the socket timeout unless someone explicitly overrides it. We can make this even more unlikely by having a fixed lower bound on the socket timeout (10 seconds?). In this approach we must also ignore the request timeout since a small timeout will completely bypass quotas. In the other approach, assuming the client only retries a fixed number of times, it will eventually experience data loss since the producer will drop the batch at some point. IMO, it is more likely that we will see this issue in production than the other issues identified above. I agree with Jay that we can delay the request longer than the request timeout since it isn't possible to enforce perfectly on the server anyway. I think that we should have a maximum delay config on the server that provides a ceiling on the most time we can delay a request and have it be lower than the socket timeout. Initially, I preferred delay and error because it seems like the most natural way to handle quota violations.. but I'm starting to see the merit in Jay's approach. Practically speaking, it reduces the number of moving parts in delivering quotas for Kafka. All changes are localized to the broker and is compatible with existing clients. Client changes will be required only if we return quota metadata in the responses or add a quota metadata API. If we discover in production that this isn't working for some reason.. we can always revisit this approach of returning errors and having the clients handle them. Note that both these data loss/duplicate issues only affect the producer. Consumers should be fine regardless of the approach we choose. Aditya From: Jun Rao [j...@confluent.io] Sent: Monday, March 16, 2015 4:27 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya
Re: [KIP-DISCUSSION] KIP-13 Quotas
Agreed that trying to shoehorn non-error codes into the error field is a bad idea. It makes it *way* too easy to write code that looks (and should be) correct but is actually incorrect. If necessary, I think it's much better to to spend a couple of extra bytes to encode that information separately (a status or warning section of the response). An indication that throttling is occurring is something I'd expect to be indicated by a bit flag in the response rather than as an error code. Gwen - I think an error code makes sense when the request actually failed. Option B, which Jun was advocating, would have appended the messages successfully. If the rate-limiting case you're talking about had successfully committed the messages, I would say that's also a bad use of error codes. On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com wrote: We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw Errors.forCode(error).exception(); // or some other default behavior This works because we have a convention that and error is something that prevented your getting the response so the default handling case is sane and forward compatible. It is tempting to use the error code to convey information in the success case. For example we could use error codes to encode whether quotas were enforced, whether the request was served out of cache, whether the stock market is up today, or whatever. The problem is that since these are not errors as far as the client is concerned it should not throw an exception but process the response, but now we created an explicit requirement that that error be handled explicitly since it is different. I really think that this kind of information is not an error, it is just information, and if we want it in the response we should do the right thing and add a new field to the response. I think you saw the Samza bug that was literally an example of this happening and leading to an infinite retry loop. Further more I really want to emphasize that hitting your quota in the design that Adi has proposed is actually not an error condition at all. It is totally reasonable in any bootstrap situation to intentionally want to run at the limit the system imposes on you. -Jay On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote: It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya -- Thanks, Ewen
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw Errors.forCode(error).exception(); // or some other default behavior This works because we have a convention that and error is something that prevented your getting the response so the default handling case is sane and forward compatible. It is tempting to use the error code to convey information in the success case. For example we could use error codes to encode whether quotas were enforced, whether the request was served out of cache, whether the stock market is up today, or whatever. The problem is that since these are not errors as far as the client is concerned it should not throw an exception but process the response, but now we created an explicit requirement that that error be handled explicitly since it is different. I really think that this kind of information is not an error, it is just information, and if we want it in the response we should do the right thing and add a new field to the response. I think you saw the Samza bug that was literally an example of this happening and leading to an infinite retry loop. Further more I really want to emphasize that hitting your quota in the design that Adi has proposed is actually not an error condition at all. It is totally reasonable in any bootstrap situation to intentionally want to run at the limit the system imposes on you. -Jay On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote: It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya
Re: [KIP-DISCUSSION] KIP-13 Quotas
I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw Errors.forCode(error).exception(); // or some other default behavior This works because we have a convention that and error is something that prevented your getting the response so the default handling case is sane and forward compatible. It is tempting to use the error code to convey information in the success case. For example we could use error codes to encode whether quotas were enforced, whether the request was served out of cache, whether the stock market is up today, or whatever. The problem is that since these are not errors as far as the client is concerned it should not throw an exception but process the response, but now we created an explicit requirement that that error be handled explicitly since it is different. I really think that this kind of information is not an error, it is just information, and if we want it in the response we should do the right thing and add a new field to the response. I think you saw the Samza bug that was literally an example of this happening and leading to an infinite retry loop. Further more I really want to emphasize that hitting your quota in the design that Adi has proposed is actually not an error condition at all. It is totally reasonable in any bootstrap situation to intentionally want to run at the limit the system imposes on you. -Jay On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote: It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya
Re: [KIP-DISCUSSION] KIP-13 Quotas
I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - And for implementation we can probably group the codes accordingly like HTTP status code such that we can do: boolean Error.isOK() { return code 300 code = 200; } Guozhang On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Agreed that trying to shoehorn non-error codes into the error field is a bad idea. It makes it *way* too easy to write code that looks (and should be) correct but is actually incorrect. If necessary, I think it's much better to to spend a couple of extra bytes to encode that information separately (a status or warning section of the response). An indication that throttling is occurring is something I'd expect to be indicated by a bit flag in the response rather than as an error code. Gwen - I think an error code makes sense when the request actually failed. Option B, which Jun was advocating, would have appended the messages successfully. If the rate-limiting case you're talking about had successfully committed the messages, I would say that's also a bad use of error codes. On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com wrote: We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw
Re: [KIP-DISCUSSION] KIP-13 Quotas
We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw Errors.forCode(error).exception(); // or some other default behavior This works because we have a convention that and error is something that prevented your getting the response so the default handling case is sane and forward compatible. It is tempting to use the error code to convey information in the success case. For example we could use error codes to encode whether quotas were enforced, whether the request was served out of cache, whether the stock market is up today, or whatever. The problem is that since these are not errors as far as the client is concerned it should not throw an exception but process the response, but now we created an explicit requirement that that error be handled explicitly since it is different. I really think that this kind of information is not an error, it is just information, and if we want it in the response we should do the right thing and add a new field to the response. I think you saw the Samza bug that was literally an example of this happening and leading to an infinite retry loop. Further more I really want to emphasize that hitting your quota in the design that Adi has proposed is actually not an error condition at all. It is totally reasonable in any bootstrap situation to intentionally want to run at the limit the system imposes on you. -Jay On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote: It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya
Re: [KIP-DISCUSSION] KIP-13 Quotas
You are right, shoe-horning status into an error field is a bad idea. While many projects use a single status field to indicate different error and non-error states, it doesn't seem like a good fit for the current Kafka implementation. Do you think that adding a status field to our protocol is feasible at this point? On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Agreed that trying to shoehorn non-error codes into the error field is a bad idea. It makes it *way* too easy to write code that looks (and should be) correct but is actually incorrect. If necessary, I think it's much better to to spend a couple of extra bytes to encode that information separately (a status or warning section of the response). An indication that throttling is occurring is something I'd expect to be indicated by a bit flag in the response rather than as an error code. Gwen - I think an error code makes sense when the request actually failed. Option B, which Jun was advocating, would have appended the messages successfully. If the rate-limiting case you're talking about had successfully committed the messages, I would say that's also a bad use of error codes. On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com wrote: We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw Errors.forCode(error).exception(); // or some other default behavior This works because we have a convention that and error is something that prevented your getting the response so the default handling case is sane and forward compatible. It is tempting to use the error code to convey information in the success case. For example we could use error codes to encode whether quotas were enforced, whether the request was served out of cache, whether the stock market is up today, or whatever. The problem is that since these are not errors as far as the client is concerned it should not throw an exception but process the response, but now we created an explicit requirement that that error be handled explicitly since it is different. I really think that this kind of information is not an error, it is just information, and if we want it in the response we should do the right thing and add a new field to the response. I think you saw the Samza bug that was literally an example of this happening and leading to an infinite retry loop. Further more I really want to emphasize that hitting your quota in the design that Adi has proposed is actually not an error condition at all. It is totally reasonable in any bootstrap situation to intentionally want to run at the limit the system imposes on you. -Jay On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote: It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya -- Thanks, Ewen
Re: [KIP-DISCUSSION] KIP-13 Quotas
My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw Errors.forCode(error).exception(); // or some other default behavior This works because we have a convention that and error is something that prevented your getting the response so the default handling case is sane and forward compatible. It is tempting to use the error code to convey information in the success case. For example we could use error codes to encode whether quotas were enforced, whether the request was served out of cache, whether the stock market is up today, or whatever. The problem is that since these are not errors as far as the client is concerned it should not throw an exception but process the response, but now we created an explicit requirement that that error be handled explicitly since it is different. I really think that this kind of information is not an error, it is just information, and if we want it in the response we should do the right thing and add a new field to the response. I think you saw the Samza bug that was literally an example of this happening and leading to an infinite retry loop. Further more I really want to emphasize that hitting your quota in the design that Adi has proposed is actually not an error condition at all. It is totally reasonable in any bootstrap situation to intentionally want to run at the limit the system imposes on you. -Jay On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote: It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya
Re: [KIP-DISCUSSION] KIP-13 Quotas
It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya
Re: [KIP-DISCUSSION] KIP-13 Quotas
We're getting off the KIP a little bit here, but while I understand the idea of not honoring the request timeout, I think that the broker ignoring it without telling the client is not a good implementation. It gets back to a larger discussion, which I was mentioning, of just negotiating with (or at least notifying) the client of important configuration details for the cluster. Any configuration, like a minimum timeout value or the maximum message size, which have to be configured the same on both the broker and the client, or where the client has a required minimum value for a setting (like fetch size), should be clearly stated by the broker in a handshake. Don't you get frustrated when you sign up for an account on a website, select a nice secure password, and then get told after submission Your password is invalid - it must be between 6 and 14 characters. Then on the next submission you get told Your password is invalid - it can only contain certain symbols, and they don't tell you what symbols are allowed? Why didn't they just tell you all that up front so you could get it right the first time? -Todd On Wed, Mar 11, 2015 at 10:22 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Todd, Yeah it is kind of weird to do the quota check after taking a request, but since the penalty is applied during that request and it just delays you to the right rate, I think it isn't exactly wrong. I admit it is weird, though. What you say about closing the connection makes sense. The issue is that our current model for connections is totally transient. The clients are supposed to handle any kind of transient connection loss and just re-establish. So basically all existing clients would likely just retry all the same whether you closed the connection or not, so at the moment there would be no way to know a retried request is actually a retry. Your point about the REST proxy is a good one, I don't think we had considered that. Currently the java producer just has a single client.id for all requests so the rest proxy would be a single client. But actually what you want is the original sender to be the client. This is technically very hard to do because the client will actually be batching records from all senders together into one request so the only way to get the client id right would be to make a new producer for each rest proxy client and this would mean a lot of memory and connections. This needs thought, not sure what solution there is. I am not 100% convinced we need to obey the request timeout. The configuration issue actually isn't a problem because the request timeout is sent with the request so the broker actually knows it now even without a handshake. However the question is, if someone sets a pathologically low request timeout do we need to obey it? and if so won't that mean we can't quota them? I claim the answer is no! I think we should redefine request timeout to mean replication timeout, which is actually what it is today. Even today if you interact with a slow server it may take longer than that timeout (say because the fs write queues up for a long-ass time). I think we need a separate client timeout which should be fairly long and unlikely to be hit (default to 30 secs or something). -Jay On Tue, Mar 10, 2015 at 10:12 AM, Todd Palino tpal...@gmail.com wrote: Thanks, Jay. On the interface, I agree with Aditya (and you, I believe) that we don't need to expose the public API contract at this time, but structuring the internal logic to allow for it later with low cost is a good idea. Glad you explained the thoughts on where to hold requests. While my gut reaction is to not like processing a produce request that is over quota, it makes sense to do it that way if you are going to have your quota action be a delay. On the delay, I see your point on the bootstrap cases. However, one of the places I differ, and part of the reason that I prefer the error, is that I would never allow a producer who is over quota to resend a produce request. A producer should identify itself at the start of it's connection, and at that point if it is over quota, the broker would return an error and close the connection. The same goes for a consumer. I'm a fan, in general, of pushing all error cases and handling down to the client and doing as little special work to accommodate those cases on the broker side as possible. A case to consider here is what does this mean for REST endpoints to Kafka? Are you going to hold the HTTP connection open as well? Is the endpoint going to queue and hold requests? I think the point that we can only delay as long as the producer's timeout is a valid one, especially given that we do not have any means for the broker and client to negotiate settings, whether that is timeouts or message sizes or anything else. There are a lot of things that you have to know when setting up a Kafka client about what
Re: [KIP-DISCUSSION] KIP-13 Quotas
Actually I think we are making the same argument. request.timeout.ms seems like it is a timeout after which your request will be sent back as an error. Actually this is not true. There are many many things that can cause a request to be slow--sitting in queue on the server, waiting on slow I/O, GC pause, network layer overload, etc etc. We actually don't and can't handle ANY of these scenarios--it is a promise that is simply impossible to enforce on the server side. The only scenario we do handle (and need to handle) is the really replication.timeout, which is the actual meaning of that config today. It actually means how long should we wait for replication to complete before giving up. With this definition I think the problem goes away. I agree about the various size mismatches which are terrible. I do think that is fixable even without a handshake, though. -Jay On Thu, Mar 12, 2015 at 9:21 AM, Todd Palino tpal...@gmail.com wrote: We're getting off the KIP a little bit here, but while I understand the idea of not honoring the request timeout, I think that the broker ignoring it without telling the client is not a good implementation. It gets back to a larger discussion, which I was mentioning, of just negotiating with (or at least notifying) the client of important configuration details for the cluster. Any configuration, like a minimum timeout value or the maximum message size, which have to be configured the same on both the broker and the client, or where the client has a required minimum value for a setting (like fetch size), should be clearly stated by the broker in a handshake. Don't you get frustrated when you sign up for an account on a website, select a nice secure password, and then get told after submission Your password is invalid - it must be between 6 and 14 characters. Then on the next submission you get told Your password is invalid - it can only contain certain symbols, and they don't tell you what symbols are allowed? Why didn't they just tell you all that up front so you could get it right the first time? -Todd On Wed, Mar 11, 2015 at 10:22 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Todd, Yeah it is kind of weird to do the quota check after taking a request, but since the penalty is applied during that request and it just delays you to the right rate, I think it isn't exactly wrong. I admit it is weird, though. What you say about closing the connection makes sense. The issue is that our current model for connections is totally transient. The clients are supposed to handle any kind of transient connection loss and just re-establish. So basically all existing clients would likely just retry all the same whether you closed the connection or not, so at the moment there would be no way to know a retried request is actually a retry. Your point about the REST proxy is a good one, I don't think we had considered that. Currently the java producer just has a single client.id for all requests so the rest proxy would be a single client. But actually what you want is the original sender to be the client. This is technically very hard to do because the client will actually be batching records from all senders together into one request so the only way to get the client id right would be to make a new producer for each rest proxy client and this would mean a lot of memory and connections. This needs thought, not sure what solution there is. I am not 100% convinced we need to obey the request timeout. The configuration issue actually isn't a problem because the request timeout is sent with the request so the broker actually knows it now even without a handshake. However the question is, if someone sets a pathologically low request timeout do we need to obey it? and if so won't that mean we can't quota them? I claim the answer is no! I think we should redefine request timeout to mean replication timeout, which is actually what it is today. Even today if you interact with a slow server it may take longer than that timeout (say because the fs write queues up for a long-ass time). I think we need a separate client timeout which should be fairly long and unlikely to be hit (default to 30 secs or something). -Jay On Tue, Mar 10, 2015 at 10:12 AM, Todd Palino tpal...@gmail.com wrote: Thanks, Jay. On the interface, I agree with Aditya (and you, I believe) that we don't need to expose the public API contract at this time, but structuring the internal logic to allow for it later with low cost is a good idea. Glad you explained the thoughts on where to hold requests. While my gut reaction is to not like processing a produce request that is over quota, it makes sense to do it that way if you are going to have your quota action be a delay. On the delay, I see your point on the bootstrap cases. However, one of the places I differ, and part of the reason
RE: [KIP-DISCUSSION] KIP-13 Quotas
Hi Bhavesh, One of the ideas behind this quota proposal is that we want to avoid enforcing quotas on the client side (both producer and consumer). We want to make it easy for people to write new clients if they choose to and implementing quotas is tricky enough without having to do bug-free in several existing clients. As you called out, this works well if we have a bounded number of producers and know what to configure the quota values to. If we are running Kafka as a service then we cannot reason about potentially thousands of clients. Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Thursday, March 12, 2015 10:35 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Actually I think we are making the same argument. request.timeout.ms seems like it is a timeout after which your request will be sent back as an error. Actually this is not true. There are many many things that can cause a request to be slow--sitting in queue on the server, waiting on slow I/O, GC pause, network layer overload, etc etc. We actually don't and can't handle ANY of these scenarios--it is a promise that is simply impossible to enforce on the server side. The only scenario we do handle (and need to handle) is the really replication.timeout, which is the actual meaning of that config today. It actually means how long should we wait for replication to complete before giving up. With this definition I think the problem goes away. I agree about the various size mismatches which are terrible. I do think that is fixable even without a handshake, though. -Jay On Thu, Mar 12, 2015 at 9:21 AM, Todd Palino tpal...@gmail.com wrote: We're getting off the KIP a little bit here, but while I understand the idea of not honoring the request timeout, I think that the broker ignoring it without telling the client is not a good implementation. It gets back to a larger discussion, which I was mentioning, of just negotiating with (or at least notifying) the client of important configuration details for the cluster. Any configuration, like a minimum timeout value or the maximum message size, which have to be configured the same on both the broker and the client, or where the client has a required minimum value for a setting (like fetch size), should be clearly stated by the broker in a handshake. Don't you get frustrated when you sign up for an account on a website, select a nice secure password, and then get told after submission Your password is invalid - it must be between 6 and 14 characters. Then on the next submission you get told Your password is invalid - it can only contain certain symbols, and they don't tell you what symbols are allowed? Why didn't they just tell you all that up front so you could get it right the first time? -Todd On Wed, Mar 11, 2015 at 10:22 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Todd, Yeah it is kind of weird to do the quota check after taking a request, but since the penalty is applied during that request and it just delays you to the right rate, I think it isn't exactly wrong. I admit it is weird, though. What you say about closing the connection makes sense. The issue is that our current model for connections is totally transient. The clients are supposed to handle any kind of transient connection loss and just re-establish. So basically all existing clients would likely just retry all the same whether you closed the connection or not, so at the moment there would be no way to know a retried request is actually a retry. Your point about the REST proxy is a good one, I don't think we had considered that. Currently the java producer just has a single client.id for all requests so the rest proxy would be a single client. But actually what you want is the original sender to be the client. This is technically very hard to do because the client will actually be batching records from all senders together into one request so the only way to get the client id right would be to make a new producer for each rest proxy client and this would mean a lot of memory and connections. This needs thought, not sure what solution there is. I am not 100% convinced we need to obey the request timeout. The configuration issue actually isn't a problem because the request timeout is sent with the request so the broker actually knows it now even without a handshake. However the question is, if someone sets a pathologically low request timeout do we need to obey it? and if so won't that mean we can't quota them? I claim the answer is no! I think we should redefine request timeout to mean replication timeout, which is actually what it is today. Even today if you interact with a slow server it may take longer than that timeout (say because the fs write queues up for a long-ass time). I think we need a separate client timeout which should be fairly long
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hi Aditya, I just wanted to give you use case of rate limiting that we have implemented with producer which is a work around: Use Case 1: 1) topic based rate limiting per producer instance (not across multiple instance of producers yet, we have producer which we send Heartbeat and regular message and we do not want to rate limit HB (which very very important data about health of application and it is periodic message does not depend on site traffic) 2) The major goal was to prevent network saturation by Kafka Producer (so limit it at producer before message are send across network to brokers and broker rejecting it no protection to network it-self.) 3) Reset the quota limit per minute (not seconds and can be changed while producer instance is running via configuration management and should not impact producer life-cycle) The design/implementation issues are: 1) Quota enforcement is per producer instance (If application team really really want to more quote they just create multiple instances of producer as work around which defeats the purpose of creating Quota in my opinion ) 2) Before inserting the message into Kafka Memory queue, we count # of bytes (un-compressed bytes) hence we do not have very accurate accounting of quote. I think from producer quota limit, I think we need to consider use case where people just want to limit data at producer and be able to control it regardless of # of producer instances to topic for same JVM and same class loader (like tomcat container). Use Case 2: Based on message key: For example, you are building Linked-In distributed tracing solution, you need sampling based on hash(key) % 100 10% then send else reject it. (Although, you can consider this as application specific or plug-able quota or sampling or a selection of message which app can do prior to producer.send() but none the less another use case at producer side) Let me know your thoughts and suggestions. Thanks, Bhavesh On Wed, Mar 11, 2015 at 10:22 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Todd, Yeah it is kind of weird to do the quota check after taking a request, but since the penalty is applied during that request and it just delays you to the right rate, I think it isn't exactly wrong. I admit it is weird, though. What you say about closing the connection makes sense. The issue is that our current model for connections is totally transient. The clients are supposed to handle any kind of transient connection loss and just re-establish. So basically all existing clients would likely just retry all the same whether you closed the connection or not, so at the moment there would be no way to know a retried request is actually a retry. Your point about the REST proxy is a good one, I don't think we had considered that. Currently the java producer just has a single client.id for all requests so the rest proxy would be a single client. But actually what you want is the original sender to be the client. This is technically very hard to do because the client will actually be batching records from all senders together into one request so the only way to get the client id right would be to make a new producer for each rest proxy client and this would mean a lot of memory and connections. This needs thought, not sure what solution there is. I am not 100% convinced we need to obey the request timeout. The configuration issue actually isn't a problem because the request timeout is sent with the request so the broker actually knows it now even without a handshake. However the question is, if someone sets a pathologically low request timeout do we need to obey it? and if so won't that mean we can't quota them? I claim the answer is no! I think we should redefine request timeout to mean replication timeout, which is actually what it is today. Even today if you interact with a slow server it may take longer than that timeout (say because the fs write queues up for a long-ass time). I think we need a separate client timeout which should be fairly long and unlikely to be hit (default to 30 secs or something). -Jay On Tue, Mar 10, 2015 at 10:12 AM, Todd Palino tpal...@gmail.com wrote: Thanks, Jay. On the interface, I agree with Aditya (and you, I believe) that we don't need to expose the public API contract at this time, but structuring the internal logic to allow for it later with low cost is a good idea. Glad you explained the thoughts on where to hold requests. While my gut reaction is to not like processing a produce request that is over quota, it makes sense to do it that way if you are going to have your quota action be a delay. On the delay, I see your point on the bootstrap cases. However, one of the places I differ, and part of the reason that I prefer the error, is that I would never allow a producer who is over quota to resend a produce request. A producer should identify itself at the start of it's connection, and at that
Re: [KIP-DISCUSSION] KIP-13 Quotas
: this allows us to quota on bytes in/out vs total number of requests in/out. It also encapsulates the logic to calculate delay. Recording the metrics in KafkaApis would make it look more complex. try { bytesPerSecondMetrics.record(newVal); produceRequestsPerSecondMetrics.record(newVal); } catch(QuotaException) { // logic to calculate delay requestThrottler.add(new DelayedResponse(...), ...) } Stepping back a bit, I see that this is all internal to KafkaApis and we have the flexibility of refactoring this as we see fit at a later date. It should be ok to not have RequestThrottler, QuotaManager be an interface but proper implementations instead. 6. As for assigning exact delays to requests, I agree that having a 30 second delay is rather bad. I was thinking that we could have 5 second windows instead. Even if a client exhausts their quota in the first 2 seconds, they will only be delayed for another 3 seconds. Is there a reason this isn't good enough? I'm also going to dig into the existing quota package and see what we can leverage. 7. Exposing quota usage: We do plan to expose the quota metrics via JMX. Adding an API to query quota status is absolutely feasible and also a good idea. Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Monday, March 09, 2015 5:01 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Todd, Nice points, let me try to respond: Plugins Yeah let me explain what I mean about plugins. The contracts that matter for us are public contracts, i.e. the protocol, the binary format, stuff in zk, and the various plug-in apis we expose. Adding an internal interface later will not be hard--the quota check is going to be done in 2-6 places in the code which would need to be updated, all internal to the broker. The challenge with making things pluggable up front is that the policy is usually fairly trivial to plug in but each policy requires different inputs--the number of partitions, different metrics, etc. Once we give a public api it is hard to add to it without breaking the original contract and hence breaking everyones plugins. So if we do this we want to get it right early if possible. In any case I think whether we want to design a pluggable api or just improve a single implementation, the work we need to do is the same: brainstorm the set of use cases the feature has and then figure out the gap in our proposed implementation that leaves some use case uncovered. Once we have these specific cases we can try to figure out if that could be solved with a plugin or by improving our default proposal. Enforcement I started out arguing your side (immediate error), but I have switched to preferring delay. Here is what convinced me, let's see if it moves you. First, the delay quota need not hold onto any request data. The produce response can be delayed after the request is completed and the fetch can be delayed prior to the fetch being executed. So no state needs to be maintained in memory, other than a single per-connection token. This is a really important implementation detail for large scale usage that I didn't realize at first. I would agree that maintaining a request per connection in memory is a non-starter for an environment with 10s of thousands of connections. The second argument is that I think this really expands the use cases where the quotas can be applicable. The use case I have heard people talk about is event collection from apps. In this use case the app is directly sending data at a more or less steady state and never really has a performance spike unless the app has a bug or the application itself experiences more traffic. So in this case you should actually never hit the quota, and if you do, the data is going to be dropped wither it is dropped by the server with an error or by the client. These use cases will never block the app (which would be dangerous) since the client is always non-blocking and drops data when it's buffer is full rather than blocking. I agree that for this use case either server-side delay or client side delay are both reasonable--the pro of a server-side delay is that it doesn't require special client handling, the pro of the server-side error is that it is more transparent. But now consider non-steady-state use cases. Here I am thinking of: 1. Data load from Hadoop 2. MM load into a cluster with live usage 3. Database changelog capture 4. Samza Each of these has a case where it is catching up or otherwise slammed by load from the source system: 1. A M/R job dump a ton of data all at once 2. MM when catching up after some downtime 3. Database changelog will have a load phase when populating data for the first time 4. Samza when restoring state or catching up after fail-over In each of these cases you want the consumer or producer to go as fast as possible
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hey Todd, Yeah it is kind of weird to do the quota check after taking a request, but since the penalty is applied during that request and it just delays you to the right rate, I think it isn't exactly wrong. I admit it is weird, though. What you say about closing the connection makes sense. The issue is that our current model for connections is totally transient. The clients are supposed to handle any kind of transient connection loss and just re-establish. So basically all existing clients would likely just retry all the same whether you closed the connection or not, so at the moment there would be no way to know a retried request is actually a retry. Your point about the REST proxy is a good one, I don't think we had considered that. Currently the java producer just has a single client.id for all requests so the rest proxy would be a single client. But actually what you want is the original sender to be the client. This is technically very hard to do because the client will actually be batching records from all senders together into one request so the only way to get the client id right would be to make a new producer for each rest proxy client and this would mean a lot of memory and connections. This needs thought, not sure what solution there is. I am not 100% convinced we need to obey the request timeout. The configuration issue actually isn't a problem because the request timeout is sent with the request so the broker actually knows it now even without a handshake. However the question is, if someone sets a pathologically low request timeout do we need to obey it? and if so won't that mean we can't quota them? I claim the answer is no! I think we should redefine request timeout to mean replication timeout, which is actually what it is today. Even today if you interact with a slow server it may take longer than that timeout (say because the fs write queues up for a long-ass time). I think we need a separate client timeout which should be fairly long and unlikely to be hit (default to 30 secs or something). -Jay On Tue, Mar 10, 2015 at 10:12 AM, Todd Palino tpal...@gmail.com wrote: Thanks, Jay. On the interface, I agree with Aditya (and you, I believe) that we don't need to expose the public API contract at this time, but structuring the internal logic to allow for it later with low cost is a good idea. Glad you explained the thoughts on where to hold requests. While my gut reaction is to not like processing a produce request that is over quota, it makes sense to do it that way if you are going to have your quota action be a delay. On the delay, I see your point on the bootstrap cases. However, one of the places I differ, and part of the reason that I prefer the error, is that I would never allow a producer who is over quota to resend a produce request. A producer should identify itself at the start of it's connection, and at that point if it is over quota, the broker would return an error and close the connection. The same goes for a consumer. I'm a fan, in general, of pushing all error cases and handling down to the client and doing as little special work to accommodate those cases on the broker side as possible. A case to consider here is what does this mean for REST endpoints to Kafka? Are you going to hold the HTTP connection open as well? Is the endpoint going to queue and hold requests? I think the point that we can only delay as long as the producer's timeout is a valid one, especially given that we do not have any means for the broker and client to negotiate settings, whether that is timeouts or message sizes or anything else. There are a lot of things that you have to know when setting up a Kafka client about what your settings should be, when much of that should be provided for in the protocol handshake. It's not as critical in an environment like ours, where we have central configuration for most clients, but we still see issues with it. I think being able to have the client and broker negotiate a minimum timeout allowed would make the delay more palatable. I'm still not sure it's the right solution, and that we're not just going with what's fast and cheap as opposed to what is good (or right). But given the details of where to hold the request, I have less of a concern with the burden on the broker. -Todd On Mon, Mar 9, 2015 at 5:01 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Todd, Nice points, let me try to respond: Plugins Yeah let me explain what I mean about plugins. The contracts that matter for us are public contracts, i.e. the protocol, the binary format, stuff in zk, and the various plug-in apis we expose. Adding an internal interface later will not be hard--the quota check is going to be done in 2-6 places in the code which would need to be updated, all internal to the broker. The challenge with making things pluggable up front is that the policy is usually fairly trivial to plug in but each policy
Re: [KIP-DISCUSSION] KIP-13 Quotas
First, a couple notes on this... 3 - I generally agree with the direction of not pre-optimizing. However, in this case I'm concerned about the calculation of the cost of doing plugins now vs. trying to refactor the code to do it later. It would seem to me that doing it up front will have less friction. If we wait to do plugins later, it will probably mean changing a lot of related code which will be significantly more work. We've spent a lot of time talking about various implementations, and I think it not unreasonable to believe that what one group wants initially is not going to solve even most cases, as it will vary by use case. 4 - I really disagree with this. Slowing down a request means that you're going to hold onto it in the broker. This takes up resources and time, and is generally not the way other services handle quota violations. In addition you are causing potential problems with the clients by taking a call that's supposed to return as quickly as possible and making it take a long time. This increases latency and deprives the client of the ability to make good decisions about what to do. By sending an error back to the client you inform them of what the problem is, and you allow the client to make an intelligent decision, such as queuing to send later, sending to another resource, or handling anything from their upstreams differently. You're absolutely right that throwing back an immediate error has the potential to turn a quota violation into a different problem for a badly behaved client. But OS and upstream networking tools can see a problem based on a layer 4 issue (rapidly reconnecting client) rather than layers above. Out of the options provided, I think A is the correct choice. B seems to be the most work (you have the delay, and the client still has to handle errors and backoff), and C is what I disagree with doing. I would also like to see a provision for allowing the client to query its quota status within the protocol. I think we should allow for a request (or information within an existing response) where the client can ask what its current quota status is. This will allow for the clients to manage their quotas, and it will allow for emitting metrics on the client side for quota status (rather than relying on the server-side metrics, which tends to put the responsibility in the wrong place). -Todd On Sun, Mar 8, 2015 at 10:52 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Adi, Great write-up. Here are some comments: 1. I don't think you need a way to disable quotas on a per-client basis, that is just the equivalent of setting the quota to be infinite, right? 2. I agree that the configuration problem is a general part of doing dynamic configuration, and it is right to separate that into the config KIP. But Joe's proposal currently doesn't provide nearly what you need in its current form--it doesn't even handle client-id based configuration, let alone the notification mechanism you would need to update your quota--so we really need to give completely explicitly how that KIP is going to solve this problem. 3. Custom quota implementations: let's do this later. Pluggability comes with a high cost and we want to try really hard to avoid it. So in the future if we have a really solid case for an alternative quota approach let's see if we can't improve the current approach and stick with one good implementation. If we really can't then let's add a plugin system. I think doing it now is premature. 4. I think the ideal quota action from the users point of view is just to slow down the writer or reader transparently to match their capacity allocation. Let's try to see if we can make that work. I think immediate error can be ruled out entirely because it depends on the client properly backing off. In cases where they don't we may actually make things worse. Given the diversity of clients I think this is probably not going to happen. The only downside to just delaying the request that was pointed out was that if the delay exceeded the request timeout the user might retry. This is true but it applies to any approach that delays requests (both B and C). I think with any sane request timeout and quota the per request delay we induce will be way lower (otherwise you would be hitting the timeout all the time just due to linux I/O variance, in which case you can't really complain). 5. We need to explain the relationship between the quota stuff in the metrics package and this. We need to either remove that stuff or use it. We can't have two quota things. Since quota fundamentally apply to windowed metrics, I would suggest doing whatever improvements to that to make it usable for quotas. 6. I don't think the quota manager interface is really what we need if I'm understanding it correctly. You give a method T extends RequestOrResponse boolean check(T request); But how would you implement this method? It seems like it would basically internally
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hey Todd, Nice points, let me try to respond: Plugins Yeah let me explain what I mean about plugins. The contracts that matter for us are public contracts, i.e. the protocol, the binary format, stuff in zk, and the various plug-in apis we expose. Adding an internal interface later will not be hard--the quota check is going to be done in 2-6 places in the code which would need to be updated, all internal to the broker. The challenge with making things pluggable up front is that the policy is usually fairly trivial to plug in but each policy requires different inputs--the number of partitions, different metrics, etc. Once we give a public api it is hard to add to it without breaking the original contract and hence breaking everyones plugins. So if we do this we want to get it right early if possible. In any case I think whether we want to design a pluggable api or just improve a single implementation, the work we need to do is the same: brainstorm the set of use cases the feature has and then figure out the gap in our proposed implementation that leaves some use case uncovered. Once we have these specific cases we can try to figure out if that could be solved with a plugin or by improving our default proposal. Enforcement I started out arguing your side (immediate error), but I have switched to preferring delay. Here is what convinced me, let's see if it moves you. First, the delay quota need not hold onto any request data. The produce response can be delayed after the request is completed and the fetch can be delayed prior to the fetch being executed. So no state needs to be maintained in memory, other than a single per-connection token. This is a really important implementation detail for large scale usage that I didn't realize at first. I would agree that maintaining a request per connection in memory is a non-starter for an environment with 10s of thousands of connections. The second argument is that I think this really expands the use cases where the quotas can be applicable. The use case I have heard people talk about is event collection from apps. In this use case the app is directly sending data at a more or less steady state and never really has a performance spike unless the app has a bug or the application itself experiences more traffic. So in this case you should actually never hit the quota, and if you do, the data is going to be dropped wither it is dropped by the server with an error or by the client. These use cases will never block the app (which would be dangerous) since the client is always non-blocking and drops data when it's buffer is full rather than blocking. I agree that for this use case either server-side delay or client side delay are both reasonable--the pro of a server-side delay is that it doesn't require special client handling, the pro of the server-side error is that it is more transparent. But now consider non-steady-state use cases. Here I am thinking of: 1. Data load from Hadoop 2. MM load into a cluster with live usage 3. Database changelog capture 4. Samza Each of these has a case where it is catching up or otherwise slammed by load from the source system: 1. A M/R job dump a ton of data all at once 2. MM when catching up after some downtime 3. Database changelog will have a load phase when populating data for the first time 4. Samza when restoring state or catching up after fail-over In each of these cases you want the consumer or producer to go as fast as possible but not impact the other users of the cluster. In these cases you are actually using the quotas totally differently. In the app event capture use case the quota was more like a safety valve that you expected to never hit. However in these cases I just listed you fully expect to hit and remain at the quota for extended periods of time and that will be totally normal. These are the cases where throttling throughput is better than sending an error. If we send an error then any producer who doesn't configure enough retries is going to start losing data. Further even if you set infinite retries the retry itself is going to mean resending the data over and over until you get a non-error. This is bad because in a period of high load you are then going to be incurring more network load as lots of producers start retrying (this isn't a problem on the consumer because the fetch request is small but is an issue on the producer). I take your point about the potential danger of slowing down a producing app that is configured to block. But actually this danger is no different than what will happen if it exceeds the node capacity now--when that happens requests will start getting slow and the app will block. The only difference is that that limit is now lower than when the node's capacity is totally exhausted. So I don't think that is a new danger. Exposing quota usage I agree we should make this available, good use of this feature obviously means knowing how close you are to your quota before you hit it.
RE: [KIP-DISCUSSION] KIP-13 Quotas
Thanks for the comments Jay and Todd. Firstly, I'd like to address some of Jay's points. 1. You are right that we don't need to disable quotas on a per-client basis. 2. Configuration management: I must admit, I haven't read Joe's proposal in detail. My thinking was that we can keep configuration management separate from this discussion since this is already quite a meaty topic. Let me spend some time reading that KIP and then I can add more detail to the quota KIP. 3. Custom Quota implementations: I don't think it is necessarily a bad idea to have a interface called the QuotaManager(RequestThrottler). This doesn't necessarily mean exposing the interface as a public API. It is a mechanism to limit code changes to 1-2 specific classes. It prevents quota logic from bleeding into multiples places in the code as happens in any big piece of code. I fully agree that we should not expose this as a public API unless there is a very strong reason to. This seems to be more of an implementation detail. 4. Metrics Package: I'll add a section on the wiki about using things from the metrics package. Currently, the quota stuff is located in clients/common/metrics. This means that we will have to migrate all that functionality into core. Do this also mean that we will need to replace the existing metrics code in core with the newly imported package as a part of this project? If so, that's a relatively large undertaking and it needs to be discussed separately IMO. 5. Request Throttler vs QuotaManager - I wanted my quota manager to do something similar to what you proposed. Inside KafkaApis, I could do: if(quotaManager.check()) // process request else return Internally QuotaManager:check() could do exactly what you suggested try { quotaMetric.record(newVal) } catch (QuotaException e) { // logic to calculate delay requestThrottler.add(new DelayedResponse(...), ...) return } This approach gives us the flexibility of deciding what metric we want to record inside QuotaManager. This brings us back to the same discussion of pluggable quota policies. It's a bit hard to articulate, but for example: this allows us to quota on bytes in/out vs total number of requests in/out. It also encapsulates the logic to calculate delay. Recording the metrics in KafkaApis would make it look more complex. try { bytesPerSecondMetrics.record(newVal); produceRequestsPerSecondMetrics.record(newVal); } catch(QuotaException) { // logic to calculate delay requestThrottler.add(new DelayedResponse(...), ...) } Stepping back a bit, I see that this is all internal to KafkaApis and we have the flexibility of refactoring this as we see fit at a later date. It should be ok to not have RequestThrottler, QuotaManager be an interface but proper implementations instead. 6. As for assigning exact delays to requests, I agree that having a 30 second delay is rather bad. I was thinking that we could have 5 second windows instead. Even if a client exhausts their quota in the first 2 seconds, they will only be delayed for another 3 seconds. Is there a reason this isn't good enough? I'm also going to dig into the existing quota package and see what we can leverage. 7. Exposing quota usage: We do plan to expose the quota metrics via JMX. Adding an API to query quota status is absolutely feasible and also a good idea. Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Monday, March 09, 2015 5:01 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Todd, Nice points, let me try to respond: Plugins Yeah let me explain what I mean about plugins. The contracts that matter for us are public contracts, i.e. the protocol, the binary format, stuff in zk, and the various plug-in apis we expose. Adding an internal interface later will not be hard--the quota check is going to be done in 2-6 places in the code which would need to be updated, all internal to the broker. The challenge with making things pluggable up front is that the policy is usually fairly trivial to plug in but each policy requires different inputs--the number of partitions, different metrics, etc. Once we give a public api it is hard to add to it without breaking the original contract and hence breaking everyones plugins. So if we do this we want to get it right early if possible. In any case I think whether we want to design a pluggable api or just improve a single implementation, the work we need to do is the same: brainstorm the set of use cases the feature has and then figure out the gap in our proposed implementation that leaves some use case uncovered. Once we have these specific cases we can try to figure out if that could be solved with a plugin or by improving our default proposal. Enforcement I started out arguing your side (immediate error), but I have switched to preferring delay. Here is what convinced me, let's see if it moves you. First
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hey Adi, Great write-up. Here are some comments: 1. I don't think you need a way to disable quotas on a per-client basis, that is just the equivalent of setting the quota to be infinite, right? 2. I agree that the configuration problem is a general part of doing dynamic configuration, and it is right to separate that into the config KIP. But Joe's proposal currently doesn't provide nearly what you need in its current form--it doesn't even handle client-id based configuration, let alone the notification mechanism you would need to update your quota--so we really need to give completely explicitly how that KIP is going to solve this problem. 3. Custom quota implementations: let's do this later. Pluggability comes with a high cost and we want to try really hard to avoid it. So in the future if we have a really solid case for an alternative quota approach let's see if we can't improve the current approach and stick with one good implementation. If we really can't then let's add a plugin system. I think doing it now is premature. 4. I think the ideal quota action from the users point of view is just to slow down the writer or reader transparently to match their capacity allocation. Let's try to see if we can make that work. I think immediate error can be ruled out entirely because it depends on the client properly backing off. In cases where they don't we may actually make things worse. Given the diversity of clients I think this is probably not going to happen. The only downside to just delaying the request that was pointed out was that if the delay exceeded the request timeout the user might retry. This is true but it applies to any approach that delays requests (both B and C). I think with any sane request timeout and quota the per request delay we induce will be way lower (otherwise you would be hitting the timeout all the time just due to linux I/O variance, in which case you can't really complain). 5. We need to explain the relationship between the quota stuff in the metrics package and this. We need to either remove that stuff or use it. We can't have two quota things. Since quota fundamentally apply to windowed metrics, I would suggest doing whatever improvements to that to make it usable for quotas. 6. I don't think the quota manager interface is really what we need if I'm understanding it correctly. You give a method T extends RequestOrResponse boolean check(T request); But how would you implement this method? It seems like it would basically internally just be a switch statement with a different check for each request type. So this is a pretty terrible object oriented api, right? It seems like what we will be doing is taking code that would otherwise just be in the request handling flow, and moving it into this method, with a bunch of instanceof checks? I think what we need is just a delayqueue and a background thread that sends the delayed responses (we were calling it a purgatory but it isn't, it is just a timeout based delay--there are no watchers or keys or any of that). Let's rename the QuotaManager RequestThrottler and have it just have a single method: class RequestThrottler { sendDelayedResponse(response, delay, timeunit) } internally it will put the response into the delay queue and there will be a background thread that sends out those responses after the delay elapses. So usage in KafkaApis would look like: try { quotaMetric.record(newVal) } catch (QuotaException e) { requestThrottler.add(new DelayedResponse(...), ...) return } The advantage of this is that the logic of what metric is being checked and the logic of how to appropriately correct the response, both of which will be specific to each request, now remain in KafkaApis where they belong. The throttler just delays the sending of the response for the appropriate time and has no per-request logic whatsoever. 7. We need to think through and state the exact algorithm for how we will assign delays to requests for a use case that is over its quota. That is closely tied to how we calculate the metric used. Here would be a bad approach we should not use: a. measure in a 30 second window. b. when we have hit the cap in that window, delay for the remainder of the 30 seconds As you can imagine with this bad algorithm you might then use all server resources for 5 seconds, then suddenly assign a 25 second delay to the next request from that client, then the window would reset and this would repeat. The quota package is already doing a good job of the windowed metrics, but we'll want to integrate the backoff calculation with that algorithm (assuming that is what we are using). Cheers, -Jay On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya
[KIP-DISCUSSION] KIP-13 Quotas
Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya