Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
here are my proposed changes: https://github.com/radai-rosenblatt/kafka/commit/8d7744ab8a6c660c4749b495b033b948a68efd3c at this point i've run this code on a test cluster under load that OOMs "vanilla" 0.10.1.0 and verified that my code deployed under the same condition remains stable. what i've done: 1. configure max heap size to 1.5GB and a single io thread (makes it easier to DOS) 2. set up a topic with 100 partitions all on the same broker (makes it easier to focus IO) - ./kafka-topics.sh --zookeeper --create --topic dos --replica-assignment [100 times the same broker id] 3. spin up load from 10 machines - ./kafka-producer-perf-test.sh --topic dos --num-records 100 --record-size 991600 --throughput 10 --producer-props bootstrap.servers= max.request.size=104857600 acks=0 linger.ms=3 buffer.memory=209715200 batch.size=1048576 this would result in single requests that are just under 100MB in size, times 10 for ~1GB max oustanding memory requirement. on my setup it was enough to reliably DOS 10.0.1.0. under my patch the broker held up (request rate was throttled). performance when not under memory load was roughly the same (note the longest run was ~1 hour, havent done long term stress tests yet). At this point I think I've addressed most (all?) the concerns and would like to move on to a vote? (obviously tha code has not been reviewed yet, but in terms of high-level approach and changes to public API the KIP is ready) On Sun, Oct 30, 2016 at 5:05 PM, radai wrote: > Hi Jun, > > the benchmarks just spawn 16 threads where each thread allocates a chunk > of memory from the pool and immediately releases it. 16 was chosen because > its typical for LinkedIn setups. the benchmarks never "consume" more than > 16 * [single allocation size] and so do not test out-of-memory performance, > but rather "normal" operating conditions. tests were run with 4 memory > allocation sizes - 1k, 10k, 100k and 1M (1M being the largest typical > single request size setting at LinkedIn). the results are in ops/sec (for > context - a single request involves a single allocation/release cycle, > typical LinkedIn setups dont go beyond 20k requests/sec on a single broker). > > results show that the GC pool (which is a combination of an AtomicLong > outstanding bytes count + weak references for allocated buffers) has a > negligible performance cost vs the simple benchmark (which does nothing, > same as current code). > > the more interesting thing that the results show is that as the requested > buffer size gets larger a single allocate/release cycle becomes more > expensive. since the benchmark never hold a lot of outstanding memory (16 * > buf size tops) i suspect the issue is memory fragmentation - its harder to > find larger contiguous chunks of heap. > > this indicates that for throughput scenarios (large request batches) > broker performance may actually be impacted by the overhead of allocating > and releasing buffers (the situation may even be worse - inter-broker > requests are much larger), and an implementation of memory pool that > actually recycles buffers (mine just acts as a limiter and leak detector) > might improve broker performance under high throughput conditions (but > thats probably a separate followup change). > > I expect to stress test my code this week (though no guarantees). > > I'll look at KIP-81. > > On Sun, Oct 30, 2016 at 12:27 PM, Jun Rao wrote: > >> Hi, Radai, >> >> Sorry for the late response. How should the benchmark results be >> interpreted? The higher the ops/s, the better? It would also be useful to >> test this out on LinkedIn's traffic with enough socket connections to see >> if there is any performance degradation. >> >> Also, there is a separate proposal KIP-81 to bound the consumer memory >> usage. Perhaps you can chime it there on whether this proposal can be >> utilized there too. >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+ >> Bound+Fetch+memory+usage+in+the+consumer >> >> Thanks, >> >> Jun >> >> On Tue, Sep 27, 2016 at 10:23 AM, radai >> wrote: >> >> > Hi Jun, >> > >> > 10 - mute/unmute functionality has been added in >> > https://github.com/radai-rosenblatt/kafka/tree/broker- >> > memory-pool-with-muting. >> > I have yet to run stress tests to see how it behaves versus without >> muting >> > >> > 11 - I've added a SimplePool implementation (nothing more than an >> > AtomicLong really) and compared it with my GC pool (that uses weak >> refs) - >> > https://github.com/radai-rosenblatt/kafka-benchmarks/ >> > tree/master/memorypool-benchmarks. >> > the results show no noticeable difference. what the results _do_ show >> > though is that for large requests (1M) performance drops very sharply. >> > since the SimplePool is essentially identical to current kafka code >> > behaviour (the nechmark never reaches out of memory conditions) it would >> > suggest to me that kafka performance for large request suffers greatly >> from >> > the cost of alloc
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Hi Jun, the benchmarks just spawn 16 threads where each thread allocates a chunk of memory from the pool and immediately releases it. 16 was chosen because its typical for LinkedIn setups. the benchmarks never "consume" more than 16 * [single allocation size] and so do not test out-of-memory performance, but rather "normal" operating conditions. tests were run with 4 memory allocation sizes - 1k, 10k, 100k and 1M (1M being the largest typical single request size setting at LinkedIn). the results are in ops/sec (for context - a single request involves a single allocation/release cycle, typical LinkedIn setups dont go beyond 20k requests/sec on a single broker). results show that the GC pool (which is a combination of an AtomicLong outstanding bytes count + weak references for allocated buffers) has a negligible performance cost vs the simple benchmark (which does nothing, same as current code). the more interesting thing that the results show is that as the requested buffer size gets larger a single allocate/release cycle becomes more expensive. since the benchmark never hold a lot of outstanding memory (16 * buf size tops) i suspect the issue is memory fragmentation - its harder to find larger contiguous chunks of heap. this indicates that for throughput scenarios (large request batches) broker performance may actually be impacted by the overhead of allocating and releasing buffers (the situation may even be worse - inter-broker requests are much larger), and an implementation of memory pool that actually recycles buffers (mine just acts as a limiter and leak detector) might improve broker performance under high throughput conditions (but thats probably a separate followup change). I expect to stress test my code this week (though no guarantees). I'll look at KIP-81. On Sun, Oct 30, 2016 at 12:27 PM, Jun Rao wrote: > Hi, Radai, > > Sorry for the late response. How should the benchmark results be > interpreted? The higher the ops/s, the better? It would also be useful to > test this out on LinkedIn's traffic with enough socket connections to see > if there is any performance degradation. > > Also, there is a separate proposal KIP-81 to bound the consumer memory > usage. Perhaps you can chime it there on whether this proposal can be > utilized there too. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 81%3A+Bound+Fetch+memory+usage+in+the+consumer > > Thanks, > > Jun > > On Tue, Sep 27, 2016 at 10:23 AM, radai > wrote: > > > Hi Jun, > > > > 10 - mute/unmute functionality has been added in > > https://github.com/radai-rosenblatt/kafka/tree/broker- > > memory-pool-with-muting. > > I have yet to run stress tests to see how it behaves versus without > muting > > > > 11 - I've added a SimplePool implementation (nothing more than an > > AtomicLong really) and compared it with my GC pool (that uses weak refs) > - > > https://github.com/radai-rosenblatt/kafka-benchmarks/ > > tree/master/memorypool-benchmarks. > > the results show no noticeable difference. what the results _do_ show > > though is that for large requests (1M) performance drops very sharply. > > since the SimplePool is essentially identical to current kafka code > > behaviour (the nechmark never reaches out of memory conditions) it would > > suggest to me that kafka performance for large request suffers greatly > from > > the cost of allocating (and releasing) large buffers (instead of actually > > pooling them for later re-use). this means that an implementation of > memory > > pool that actually pools ( :-) ) is very likely to improve broker > > performance for large requests. > > > > 12 - if there was a single thread iterating over selection keys then > > stopping at 1st unsatisfiable request might work (if iteration order over > > selection keys is deterministic, which is OS-dependent). however, kafka > > spawns multiple selectors sharing the same pool so i doubt the approach > > would gain anything. also notice that the current code already shuffles > the > > selection keys if memory is low (<10%) to try and guarantee fairness. > > > > attached the benchmark results for the pool implementations: > > > > BenchmarkMode Cnt > > ScoreError Units > > GarbageCollectedMemoryPoolBenchmark.alloc_100k thrpt5 > > 198272.519 ± 16045.965 ops/s > > GarbageCollectedMemoryPoolBenchmark.alloc_10k thrpt5 > > 2781439.307 ± 185287.072 ops/s > > GarbageCollectedMemoryPoolBenchmark.alloc_1kthrpt5 > > 6029199.952 ± 465936.118 ops/s > > GarbageCollectedMemoryPoolBenchmark.alloc_1mthrpt5 > > 18464.272 ±332.861 ops/s > > SimpleMemoryPoolBenchmark.alloc_100kthrpt5 > > 204240.066 ± 2207.619 ops/s > > SimpleMemoryPoolBenchmark.alloc_10k thrpt5 > > 3000794.525 ± 83510.836 ops/s > > SimpleMemoryPoolBenchmark.alloc_1k thrpt5 > > 5893671.778 ± 274239.541 ops/s > > SimpleMemoryPoolBenchmark.alloc_1m thrpt
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Hi, Radai, Sorry for the late response. How should the benchmark results be interpreted? The higher the ops/s, the better? It would also be useful to test this out on LinkedIn's traffic with enough socket connections to see if there is any performance degradation. Also, there is a separate proposal KIP-81 to bound the consumer memory usage. Perhaps you can chime it there on whether this proposal can be utilized there too. https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer Thanks, Jun On Tue, Sep 27, 2016 at 10:23 AM, radai wrote: > Hi Jun, > > 10 - mute/unmute functionality has been added in > https://github.com/radai-rosenblatt/kafka/tree/broker- > memory-pool-with-muting. > I have yet to run stress tests to see how it behaves versus without muting > > 11 - I've added a SimplePool implementation (nothing more than an > AtomicLong really) and compared it with my GC pool (that uses weak refs) - > https://github.com/radai-rosenblatt/kafka-benchmarks/ > tree/master/memorypool-benchmarks. > the results show no noticeable difference. what the results _do_ show > though is that for large requests (1M) performance drops very sharply. > since the SimplePool is essentially identical to current kafka code > behaviour (the nechmark never reaches out of memory conditions) it would > suggest to me that kafka performance for large request suffers greatly from > the cost of allocating (and releasing) large buffers (instead of actually > pooling them for later re-use). this means that an implementation of memory > pool that actually pools ( :-) ) is very likely to improve broker > performance for large requests. > > 12 - if there was a single thread iterating over selection keys then > stopping at 1st unsatisfiable request might work (if iteration order over > selection keys is deterministic, which is OS-dependent). however, kafka > spawns multiple selectors sharing the same pool so i doubt the approach > would gain anything. also notice that the current code already shuffles the > selection keys if memory is low (<10%) to try and guarantee fairness. > > attached the benchmark results for the pool implementations: > > BenchmarkMode Cnt > ScoreError Units > GarbageCollectedMemoryPoolBenchmark.alloc_100k thrpt5 > 198272.519 ± 16045.965 ops/s > GarbageCollectedMemoryPoolBenchmark.alloc_10k thrpt5 > 2781439.307 ± 185287.072 ops/s > GarbageCollectedMemoryPoolBenchmark.alloc_1kthrpt5 > 6029199.952 ± 465936.118 ops/s > GarbageCollectedMemoryPoolBenchmark.alloc_1mthrpt5 > 18464.272 ±332.861 ops/s > SimpleMemoryPoolBenchmark.alloc_100kthrpt5 > 204240.066 ± 2207.619 ops/s > SimpleMemoryPoolBenchmark.alloc_10k thrpt5 > 3000794.525 ± 83510.836 ops/s > SimpleMemoryPoolBenchmark.alloc_1k thrpt5 > 5893671.778 ± 274239.541 ops/s > SimpleMemoryPoolBenchmark.alloc_1m thrpt5 > 18728.085 ±792.563 ops/s > > > > On Sat, Sep 24, 2016 at 9:07 AM, Jun Rao wrote: > > > Hi, Radi, > > > > For 10, yes, we don't want the buffer pool to wake up the selector every > > time some memory is freed up. We only want to do that when there is > pending > > requests to the buffer pool not honored due to not enough memory. > > > > For 11, we probably want to be a bit careful with Weak References. In > > https://issues.apache.org/jira/browse/KAFKA-1989, we initially tried an > > implementation based on Weak Reference, but abandoned it due to too much > GC > > overhead. It probably also makes the code a bit harder to understand. So, > > perhaps it would be better if we can avoid it. > > > > For 12, that's a good point. I thought the selector will do some > shuffling > > for fairness. Perhaps we should stop allocating from the buffer pool when > > we see the first key whose memory can't be honored? > > > > Thanks, > > > > Jun > > > > > > On Sat, Sep 24, 2016 at 8:44 AM, radai > wrote: > > > > > Hi Jun, > > > > > > 10 - I'll add this functionality to the mute/unmute branch. as every > > > mute/unmute operation is O(#connections / #selectorThreads) maybe a > > > watermark approach is better than waking when _any_ mem is available? > > > > > > 11 - "gc notifications" are done by using a ReferenceQueue ( > > > https://docs.oracle.com/javase/8/docs/api/java/lang/ > > > ref/ReferenceQueue.html) > > > in combination with weak references to allocated buffers. when a buffer > > is > > > reclaimed by the GC the corresponding weak ref to it is enqueued. the > > pool > > > maintains a set of outstanding buffer IDs (every allocated buffer gets > a > > > unique id - basically a sequence). a buffer explicitly returned has its > > id > > > removed from the tracking set and the weak reference to it destroyed, > so > > > its reference will never be enqueued by the GC even if it is GC'ed > later. > > > an enqueued reference (which indicates a buffer not returned to
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Hi Jun, 10 - mute/unmute functionality has been added in https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting. I have yet to run stress tests to see how it behaves versus without muting 11 - I've added a SimplePool implementation (nothing more than an AtomicLong really) and compared it with my GC pool (that uses weak refs) - https://github.com/radai-rosenblatt/kafka-benchmarks/tree/master/memorypool-benchmarks. the results show no noticeable difference. what the results _do_ show though is that for large requests (1M) performance drops very sharply. since the SimplePool is essentially identical to current kafka code behaviour (the nechmark never reaches out of memory conditions) it would suggest to me that kafka performance for large request suffers greatly from the cost of allocating (and releasing) large buffers (instead of actually pooling them for later re-use). this means that an implementation of memory pool that actually pools ( :-) ) is very likely to improve broker performance for large requests. 12 - if there was a single thread iterating over selection keys then stopping at 1st unsatisfiable request might work (if iteration order over selection keys is deterministic, which is OS-dependent). however, kafka spawns multiple selectors sharing the same pool so i doubt the approach would gain anything. also notice that the current code already shuffles the selection keys if memory is low (<10%) to try and guarantee fairness. attached the benchmark results for the pool implementations: BenchmarkMode Cnt ScoreError Units GarbageCollectedMemoryPoolBenchmark.alloc_100k thrpt5 198272.519 ± 16045.965 ops/s GarbageCollectedMemoryPoolBenchmark.alloc_10k thrpt5 2781439.307 ± 185287.072 ops/s GarbageCollectedMemoryPoolBenchmark.alloc_1kthrpt5 6029199.952 ± 465936.118 ops/s GarbageCollectedMemoryPoolBenchmark.alloc_1mthrpt5 18464.272 ±332.861 ops/s SimpleMemoryPoolBenchmark.alloc_100kthrpt5 204240.066 ± 2207.619 ops/s SimpleMemoryPoolBenchmark.alloc_10k thrpt5 3000794.525 ± 83510.836 ops/s SimpleMemoryPoolBenchmark.alloc_1k thrpt5 5893671.778 ± 274239.541 ops/s SimpleMemoryPoolBenchmark.alloc_1m thrpt5 18728.085 ±792.563 ops/s On Sat, Sep 24, 2016 at 9:07 AM, Jun Rao wrote: > Hi, Radi, > > For 10, yes, we don't want the buffer pool to wake up the selector every > time some memory is freed up. We only want to do that when there is pending > requests to the buffer pool not honored due to not enough memory. > > For 11, we probably want to be a bit careful with Weak References. In > https://issues.apache.org/jira/browse/KAFKA-1989, we initially tried an > implementation based on Weak Reference, but abandoned it due to too much GC > overhead. It probably also makes the code a bit harder to understand. So, > perhaps it would be better if we can avoid it. > > For 12, that's a good point. I thought the selector will do some shuffling > for fairness. Perhaps we should stop allocating from the buffer pool when > we see the first key whose memory can't be honored? > > Thanks, > > Jun > > > On Sat, Sep 24, 2016 at 8:44 AM, radai wrote: > > > Hi Jun, > > > > 10 - I'll add this functionality to the mute/unmute branch. as every > > mute/unmute operation is O(#connections / #selectorThreads) maybe a > > watermark approach is better than waking when _any_ mem is available? > > > > 11 - "gc notifications" are done by using a ReferenceQueue ( > > https://docs.oracle.com/javase/8/docs/api/java/lang/ > > ref/ReferenceQueue.html) > > in combination with weak references to allocated buffers. when a buffer > is > > reclaimed by the GC the corresponding weak ref to it is enqueued. the > pool > > maintains a set of outstanding buffer IDs (every allocated buffer gets a > > unique id - basically a sequence). a buffer explicitly returned has its > id > > removed from the tracking set and the weak reference to it destroyed, so > > its reference will never be enqueued by the GC even if it is GC'ed later. > > an enqueued reference (which indicates a buffer not returned to pool) > also > > carries the buffer id, which is then removed from the outstanding buffers > > set and the memory marked as available (and a warning printed). the pool > > has a background thread dedicated to reading references out of the queue > > (which under normal conditions remains blocked forever). > > > > 12 - the issue here is that a single "large" request (say 1MB) can get > > blocked indefinitely under a high pressure of much smaller requests (say > > 1KB) keeping memory utilization close to 100%. if we dont care about > > potential starvation the change is in a single condition. i'll make this > > configurable. > > > > 13 - I'll change the parameter name. > > > > On Fri, Sep 23, 2016 at 3:38 PM, Jun Rao wrote: > > > > > Hi, Radai, > > > > > > Thanks for the updated KIP
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Hi, Radi, For 10, yes, we don't want the buffer pool to wake up the selector every time some memory is freed up. We only want to do that when there is pending requests to the buffer pool not honored due to not enough memory. For 11, we probably want to be a bit careful with Weak References. In https://issues.apache.org/jira/browse/KAFKA-1989, we initially tried an implementation based on Weak Reference, but abandoned it due to too much GC overhead. It probably also makes the code a bit harder to understand. So, perhaps it would be better if we can avoid it. For 12, that's a good point. I thought the selector will do some shuffling for fairness. Perhaps we should stop allocating from the buffer pool when we see the first key whose memory can't be honored? Thanks, Jun On Sat, Sep 24, 2016 at 8:44 AM, radai wrote: > Hi Jun, > > 10 - I'll add this functionality to the mute/unmute branch. as every > mute/unmute operation is O(#connections / #selectorThreads) maybe a > watermark approach is better than waking when _any_ mem is available? > > 11 - "gc notifications" are done by using a ReferenceQueue ( > https://docs.oracle.com/javase/8/docs/api/java/lang/ > ref/ReferenceQueue.html) > in combination with weak references to allocated buffers. when a buffer is > reclaimed by the GC the corresponding weak ref to it is enqueued. the pool > maintains a set of outstanding buffer IDs (every allocated buffer gets a > unique id - basically a sequence). a buffer explicitly returned has its id > removed from the tracking set and the weak reference to it destroyed, so > its reference will never be enqueued by the GC even if it is GC'ed later. > an enqueued reference (which indicates a buffer not returned to pool) also > carries the buffer id, which is then removed from the outstanding buffers > set and the memory marked as available (and a warning printed). the pool > has a background thread dedicated to reading references out of the queue > (which under normal conditions remains blocked forever). > > 12 - the issue here is that a single "large" request (say 1MB) can get > blocked indefinitely under a high pressure of much smaller requests (say > 1KB) keeping memory utilization close to 100%. if we dont care about > potential starvation the change is in a single condition. i'll make this > configurable. > > 13 - I'll change the parameter name. > > On Fri, Sep 23, 2016 at 3:38 PM, Jun Rao wrote: > > > Hi, Radai, > > > > Thanks for the updated KIP. A few more questions/comments below. > > > > 10. For "the mute/unmute happens just before poll(), which means as a > worst > > case there will be no reads for 300ms if memory was unavailable", I am > > thinking that memory-pool could track if there is any pending request and > > wake up the selector when memory is released and there is a pending > > request. This way, poll() doesn't have to wait for the timeout if memory > > frees up early. > > > > 11. For "to facilitate faster implementation (as a safety net) the pool > > will be implemented in such a way that memory that was not release()ed > (but > > still garbage collected) would be detected and "reclaimed". this is to > > prevent "leaks" in case of code paths that fail to release() properly.", > > could you explain a bit at the high level how this is done? > > > > 12. For "As the pool would allow any size request if it has any capacity > > available, the actual memory bound is queued.max.bytes + > > socket.request.max.bytes.", it seems intuitively, the pool should only > give > > the Buffer back if it has enough available bytes. Then the request memory > > can be bounded by queued.max.bytes. We can validate that queued.max.bytes > > is at least socket.request.max.bytes. > > > > 13. For the naming, it seems request.queue.max.bytes is clearer than > > queue.max.bytes. > > > > Jun > > > > > > > > On Thu, Sep 22, 2016 at 10:53 AM, radai > > wrote: > > > > > As discussed in the KIP call, I have updated the kip-72 page ( > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests) > > > to record both configuration validations and implementation concerns. > > > I've also implemented channel muting/unmuting in response to memory > > > pressure. its available as a separate branch here - > > > https://github.com/radai-rosenblatt/kafka/tree/broker- > > > memory-pool-with-muting > > > . the implementation without muting is here - > > > https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool. > > > > > > the mute/unmute happens just before poll(), which means as a worst case > > > there will be no reads for 300ms if memory was unavailable (thats the > > > timeout untill the next poll). perhaps a design with dedicated read > > threads > > > could do better (such a thread could actually block waiting for > memory), > > > but that would be a giant change. > > > > > > On Tue, Sep 13, 2016 at 9:20 AM, radai > > wrote: > > > > > > > the specific memo
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Hi Jun, 10 - I'll add this functionality to the mute/unmute branch. as every mute/unmute operation is O(#connections / #selectorThreads) maybe a watermark approach is better than waking when _any_ mem is available? 11 - "gc notifications" are done by using a ReferenceQueue ( https://docs.oracle.com/javase/8/docs/api/java/lang/ref/ReferenceQueue.html) in combination with weak references to allocated buffers. when a buffer is reclaimed by the GC the corresponding weak ref to it is enqueued. the pool maintains a set of outstanding buffer IDs (every allocated buffer gets a unique id - basically a sequence). a buffer explicitly returned has its id removed from the tracking set and the weak reference to it destroyed, so its reference will never be enqueued by the GC even if it is GC'ed later. an enqueued reference (which indicates a buffer not returned to pool) also carries the buffer id, which is then removed from the outstanding buffers set and the memory marked as available (and a warning printed). the pool has a background thread dedicated to reading references out of the queue (which under normal conditions remains blocked forever). 12 - the issue here is that a single "large" request (say 1MB) can get blocked indefinitely under a high pressure of much smaller requests (say 1KB) keeping memory utilization close to 100%. if we dont care about potential starvation the change is in a single condition. i'll make this configurable. 13 - I'll change the parameter name. On Fri, Sep 23, 2016 at 3:38 PM, Jun Rao wrote: > Hi, Radai, > > Thanks for the updated KIP. A few more questions/comments below. > > 10. For "the mute/unmute happens just before poll(), which means as a worst > case there will be no reads for 300ms if memory was unavailable", I am > thinking that memory-pool could track if there is any pending request and > wake up the selector when memory is released and there is a pending > request. This way, poll() doesn't have to wait for the timeout if memory > frees up early. > > 11. For "to facilitate faster implementation (as a safety net) the pool > will be implemented in such a way that memory that was not release()ed (but > still garbage collected) would be detected and "reclaimed". this is to > prevent "leaks" in case of code paths that fail to release() properly.", > could you explain a bit at the high level how this is done? > > 12. For "As the pool would allow any size request if it has any capacity > available, the actual memory bound is queued.max.bytes + > socket.request.max.bytes.", it seems intuitively, the pool should only give > the Buffer back if it has enough available bytes. Then the request memory > can be bounded by queued.max.bytes. We can validate that queued.max.bytes > is at least socket.request.max.bytes. > > 13. For the naming, it seems request.queue.max.bytes is clearer than > queue.max.bytes. > > Jun > > > > On Thu, Sep 22, 2016 at 10:53 AM, radai > wrote: > > > As discussed in the KIP call, I have updated the kip-72 page ( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests) > > to record both configuration validations and implementation concerns. > > I've also implemented channel muting/unmuting in response to memory > > pressure. its available as a separate branch here - > > https://github.com/radai-rosenblatt/kafka/tree/broker- > > memory-pool-with-muting > > . the implementation without muting is here - > > https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool. > > > > the mute/unmute happens just before poll(), which means as a worst case > > there will be no reads for 300ms if memory was unavailable (thats the > > timeout untill the next poll). perhaps a design with dedicated read > threads > > could do better (such a thread could actually block waiting for memory), > > but that would be a giant change. > > > > On Tue, Sep 13, 2016 at 9:20 AM, radai > wrote: > > > > > the specific memory pool implementation i wrote will allocate _any_ > > amount > > > you request if it has _any_ memory available (so if it has 1 byte > > available > > > and you ask for 1MB you will get 1MB and the counter will go negative). > > > this was done to avoid issues with starvation of large requests. other > > > implementations may be more strict. to me this means that generally its > > not > > > a simple "have memory" vs "no memory" split (which gets worse under a > > > hypothetical tiered pool scheme for QoS). > > > > > > to allow this flexibility in pool implementation i must preserve the > > > amount of memory required. once read from the channel i cant put it > back, > > > so i store it? > > > > > > On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram < > > > rajinisiva...@googlemail.com> wrote: > > > > > >> Is there any value in allowing the 4-byte size to be read even when > the > > >> request memory limit has been reached? If not, you can disable OP_READ > > >> interest for all channels that are
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Hi, Radai, Thanks for the updated KIP. A few more questions/comments below. 10. For "the mute/unmute happens just before poll(), which means as a worst case there will be no reads for 300ms if memory was unavailable", I am thinking that memory-pool could track if there is any pending request and wake up the selector when memory is released and there is a pending request. This way, poll() doesn't have to wait for the timeout if memory frees up early. 11. For "to facilitate faster implementation (as a safety net) the pool will be implemented in such a way that memory that was not release()ed (but still garbage collected) would be detected and "reclaimed". this is to prevent "leaks" in case of code paths that fail to release() properly.", could you explain a bit at the high level how this is done? 12. For "As the pool would allow any size request if it has any capacity available, the actual memory bound is queued.max.bytes + socket.request.max.bytes.", it seems intuitively, the pool should only give the Buffer back if it has enough available bytes. Then the request memory can be bounded by queued.max.bytes. We can validate that queued.max.bytes is at least socket.request.max.bytes. 13. For the naming, it seems request.queue.max.bytes is clearer than queue.max.bytes. Jun On Thu, Sep 22, 2016 at 10:53 AM, radai wrote: > As discussed in the KIP call, I have updated the kip-72 page ( > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests) > to record both configuration validations and implementation concerns. > I've also implemented channel muting/unmuting in response to memory > pressure. its available as a separate branch here - > https://github.com/radai-rosenblatt/kafka/tree/broker- > memory-pool-with-muting > . the implementation without muting is here - > https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool. > > the mute/unmute happens just before poll(), which means as a worst case > there will be no reads for 300ms if memory was unavailable (thats the > timeout untill the next poll). perhaps a design with dedicated read threads > could do better (such a thread could actually block waiting for memory), > but that would be a giant change. > > On Tue, Sep 13, 2016 at 9:20 AM, radai wrote: > > > the specific memory pool implementation i wrote will allocate _any_ > amount > > you request if it has _any_ memory available (so if it has 1 byte > available > > and you ask for 1MB you will get 1MB and the counter will go negative). > > this was done to avoid issues with starvation of large requests. other > > implementations may be more strict. to me this means that generally its > not > > a simple "have memory" vs "no memory" split (which gets worse under a > > hypothetical tiered pool scheme for QoS). > > > > to allow this flexibility in pool implementation i must preserve the > > amount of memory required. once read from the channel i cant put it back, > > so i store it? > > > > On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram < > > rajinisiva...@googlemail.com> wrote: > > > >> Is there any value in allowing the 4-byte size to be read even when the > >> request memory limit has been reached? If not, you can disable OP_READ > >> interest for all channels that are ready inside Selector.poll() when > >> memory > >> limit has been reached and re-enable before returning from poll(). > Perhaps > >> a listener that is invoked when MemoryPool moves from unavailable to > >> available state can wakeup the selector. The changes for this should be > >> fairly contained without any additional channel state. And it would > avoid > >> the overhead of polls that return immediately even when progress cannot > be > >> made because memory limit has been reached. > >> > >> On Tue, Sep 13, 2016 at 12:31 AM, radai > >> wrote: > >> > >> > Hi Jun, > >> > > >> > Yes, youre right - right now the next select() call will return > >> immediately > >> > with the same set of keys as earlier (at least) as they were not > >> previously > >> > handled (no memory). > >> > My assumption is that this happens under considerable load - something > >> has > >> > to be occupying all this memory. also, this happens in the context of > >> > SocketServer.Processor.run(): > >> > > >> > while (isRunning) { > >> >configureNewConnections() > >> >processNewResponses() > >> >poll() <-- HERE > >> >processCompletedReceives() > >> >processCompletedSends() > >> >processDisconnected() > >> > } > >> > > >> > even within poll(), things like finishConnection(), prepare(), and > >> write()s > >> > can still make progress under low memory conditions. and given the > load, > >> > there's probably progress to be made in processCompletedReceives(), > >> > processCompletedSends() and processDisconnected(). > >> > > >> > if there's progress to be made in other things its likely that the > next > >> > call to poll() will not happen immediately and so t
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
As discussed in the KIP call, I have updated the kip-72 page ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests) to record both configuration validations and implementation concerns. I've also implemented channel muting/unmuting in response to memory pressure. its available as a separate branch here - https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting . the implementation without muting is here - https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool. the mute/unmute happens just before poll(), which means as a worst case there will be no reads for 300ms if memory was unavailable (thats the timeout untill the next poll). perhaps a design with dedicated read threads could do better (such a thread could actually block waiting for memory), but that would be a giant change. On Tue, Sep 13, 2016 at 9:20 AM, radai wrote: > the specific memory pool implementation i wrote will allocate _any_ amount > you request if it has _any_ memory available (so if it has 1 byte available > and you ask for 1MB you will get 1MB and the counter will go negative). > this was done to avoid issues with starvation of large requests. other > implementations may be more strict. to me this means that generally its not > a simple "have memory" vs "no memory" split (which gets worse under a > hypothetical tiered pool scheme for QoS). > > to allow this flexibility in pool implementation i must preserve the > amount of memory required. once read from the channel i cant put it back, > so i store it? > > On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram < > rajinisiva...@googlemail.com> wrote: > >> Is there any value in allowing the 4-byte size to be read even when the >> request memory limit has been reached? If not, you can disable OP_READ >> interest for all channels that are ready inside Selector.poll() when >> memory >> limit has been reached and re-enable before returning from poll(). Perhaps >> a listener that is invoked when MemoryPool moves from unavailable to >> available state can wakeup the selector. The changes for this should be >> fairly contained without any additional channel state. And it would avoid >> the overhead of polls that return immediately even when progress cannot be >> made because memory limit has been reached. >> >> On Tue, Sep 13, 2016 at 12:31 AM, radai >> wrote: >> >> > Hi Jun, >> > >> > Yes, youre right - right now the next select() call will return >> immediately >> > with the same set of keys as earlier (at least) as they were not >> previously >> > handled (no memory). >> > My assumption is that this happens under considerable load - something >> has >> > to be occupying all this memory. also, this happens in the context of >> > SocketServer.Processor.run(): >> > >> > while (isRunning) { >> >configureNewConnections() >> >processNewResponses() >> >poll() <-- HERE >> >processCompletedReceives() >> >processCompletedSends() >> >processDisconnected() >> > } >> > >> > even within poll(), things like finishConnection(), prepare(), and >> write()s >> > can still make progress under low memory conditions. and given the load, >> > there's probably progress to be made in processCompletedReceives(), >> > processCompletedSends() and processDisconnected(). >> > >> > if there's progress to be made in other things its likely that the next >> > call to poll() will not happen immediately and so the loop wont be that >> > tight. in order for this to devolve into true busy waiting you would >> need a >> > situation where no progress can be made on any in-progress requests and >> no >> > responses to send out ? >> > >> > if my assumption does not hold then you are correct, and >> selector.poll(300) >> > currently hardcoded in SocketServer.Processor.poll() would need to be >> > replaced with something more complicated. my biggest point of concern >> > though is that the resulting code would be complicated and would couple >> > Selector to the memory pool very tightly. undey my current patch >> Selector >> > needs the memory pool only to pass to channels when they are built. this >> > would allow different memory pools relatively easily for things like >> > reserving memory for cross-broker replication and high-SLA connections. >> a >> > tighter coupling would make any such future modification hard. >> > >> > On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao wrote: >> > >> > > Hi, Radai, >> > > >> > > Thanks for the reply. I still have a followup question on #2. >> > > >> > > My understanding is that in your proposal, selector will now first >> read >> > the >> > > size of the Receive. If there is not enough memory, it has to turn off >> > the >> > > READ interest bit for the corresponding KafkaChannel. Otherwise, >> > subsequent >> > > selector.poll() call will always return immediately, adding >> unnecessary >> > > overhead. If you do that, the Selector will need to know when to >> turn on
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
the specific memory pool implementation i wrote will allocate _any_ amount you request if it has _any_ memory available (so if it has 1 byte available and you ask for 1MB you will get 1MB and the counter will go negative). this was done to avoid issues with starvation of large requests. other implementations may be more strict. to me this means that generally its not a simple "have memory" vs "no memory" split (which gets worse under a hypothetical tiered pool scheme for QoS). to allow this flexibility in pool implementation i must preserve the amount of memory required. once read from the channel i cant put it back, so i store it? On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram < rajinisiva...@googlemail.com> wrote: > Is there any value in allowing the 4-byte size to be read even when the > request memory limit has been reached? If not, you can disable OP_READ > interest for all channels that are ready inside Selector.poll() when memory > limit has been reached and re-enable before returning from poll(). Perhaps > a listener that is invoked when MemoryPool moves from unavailable to > available state can wakeup the selector. The changes for this should be > fairly contained without any additional channel state. And it would avoid > the overhead of polls that return immediately even when progress cannot be > made because memory limit has been reached. > > On Tue, Sep 13, 2016 at 12:31 AM, radai > wrote: > > > Hi Jun, > > > > Yes, youre right - right now the next select() call will return > immediately > > with the same set of keys as earlier (at least) as they were not > previously > > handled (no memory). > > My assumption is that this happens under considerable load - something > has > > to be occupying all this memory. also, this happens in the context of > > SocketServer.Processor.run(): > > > > while (isRunning) { > >configureNewConnections() > >processNewResponses() > >poll() <-- HERE > >processCompletedReceives() > >processCompletedSends() > >processDisconnected() > > } > > > > even within poll(), things like finishConnection(), prepare(), and > write()s > > can still make progress under low memory conditions. and given the load, > > there's probably progress to be made in processCompletedReceives(), > > processCompletedSends() and processDisconnected(). > > > > if there's progress to be made in other things its likely that the next > > call to poll() will not happen immediately and so the loop wont be that > > tight. in order for this to devolve into true busy waiting you would > need a > > situation where no progress can be made on any in-progress requests and > no > > responses to send out ? > > > > if my assumption does not hold then you are correct, and > selector.poll(300) > > currently hardcoded in SocketServer.Processor.poll() would need to be > > replaced with something more complicated. my biggest point of concern > > though is that the resulting code would be complicated and would couple > > Selector to the memory pool very tightly. undey my current patch Selector > > needs the memory pool only to pass to channels when they are built. this > > would allow different memory pools relatively easily for things like > > reserving memory for cross-broker replication and high-SLA connections. a > > tighter coupling would make any such future modification hard. > > > > On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao wrote: > > > > > Hi, Radai, > > > > > > Thanks for the reply. I still have a followup question on #2. > > > > > > My understanding is that in your proposal, selector will now first read > > the > > > size of the Receive. If there is not enough memory, it has to turn off > > the > > > READ interest bit for the corresponding KafkaChannel. Otherwise, > > subsequent > > > selector.poll() call will always return immediately, adding unnecessary > > > overhead. If you do that, the Selector will need to know when to turn > on > > > the READ interest bit again. It may not be enough to do this check > until > > > the next poll call since the timeout used by poll() could be > arbitrarily > > > large. So, it seems that some kind of coordination between the Selector > > and > > > the bufferpool is needed? > > > > > > Jun > > > > > > On Thu, Sep 8, 2016 at 7:02 PM, radai > > wrote: > > > > > > > Hi Jun, > > > > > > > > 1. yes, it is my own personal opinion that people use > > queued.max.requests > > > > as an indirect way to bound memory consumption. once a more direct > > memory > > > > bound mechanism exists (and works) i dont think queued.max.requests > > woul > > > > dbe required. having said that I was not planning on making any > changes > > > > w.r.t queued.max.requests support (so I was aiming to get to a > > situation > > > > where both configs are supported) to allow gathering enough > > > data/feedback. > > > > > > > > 2. Selector.poll() calls into KafkaChannel.read() to maybe get a > > > > NetworkReceive. multiple such read() calls may be required until a >
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Is there any value in allowing the 4-byte size to be read even when the request memory limit has been reached? If not, you can disable OP_READ interest for all channels that are ready inside Selector.poll() when memory limit has been reached and re-enable before returning from poll(). Perhaps a listener that is invoked when MemoryPool moves from unavailable to available state can wakeup the selector. The changes for this should be fairly contained without any additional channel state. And it would avoid the overhead of polls that return immediately even when progress cannot be made because memory limit has been reached. On Tue, Sep 13, 2016 at 12:31 AM, radai wrote: > Hi Jun, > > Yes, youre right - right now the next select() call will return immediately > with the same set of keys as earlier (at least) as they were not previously > handled (no memory). > My assumption is that this happens under considerable load - something has > to be occupying all this memory. also, this happens in the context of > SocketServer.Processor.run(): > > while (isRunning) { >configureNewConnections() >processNewResponses() >poll() <-- HERE >processCompletedReceives() >processCompletedSends() >processDisconnected() > } > > even within poll(), things like finishConnection(), prepare(), and write()s > can still make progress under low memory conditions. and given the load, > there's probably progress to be made in processCompletedReceives(), > processCompletedSends() and processDisconnected(). > > if there's progress to be made in other things its likely that the next > call to poll() will not happen immediately and so the loop wont be that > tight. in order for this to devolve into true busy waiting you would need a > situation where no progress can be made on any in-progress requests and no > responses to send out ? > > if my assumption does not hold then you are correct, and selector.poll(300) > currently hardcoded in SocketServer.Processor.poll() would need to be > replaced with something more complicated. my biggest point of concern > though is that the resulting code would be complicated and would couple > Selector to the memory pool very tightly. undey my current patch Selector > needs the memory pool only to pass to channels when they are built. this > would allow different memory pools relatively easily for things like > reserving memory for cross-broker replication and high-SLA connections. a > tighter coupling would make any such future modification hard. > > On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao wrote: > > > Hi, Radai, > > > > Thanks for the reply. I still have a followup question on #2. > > > > My understanding is that in your proposal, selector will now first read > the > > size of the Receive. If there is not enough memory, it has to turn off > the > > READ interest bit for the corresponding KafkaChannel. Otherwise, > subsequent > > selector.poll() call will always return immediately, adding unnecessary > > overhead. If you do that, the Selector will need to know when to turn on > > the READ interest bit again. It may not be enough to do this check until > > the next poll call since the timeout used by poll() could be arbitrarily > > large. So, it seems that some kind of coordination between the Selector > and > > the bufferpool is needed? > > > > Jun > > > > On Thu, Sep 8, 2016 at 7:02 PM, radai > wrote: > > > > > Hi Jun, > > > > > > 1. yes, it is my own personal opinion that people use > queued.max.requests > > > as an indirect way to bound memory consumption. once a more direct > memory > > > bound mechanism exists (and works) i dont think queued.max.requests > woul > > > dbe required. having said that I was not planning on making any changes > > > w.r.t queued.max.requests support (so I was aiming to get to a > situation > > > where both configs are supported) to allow gathering enough > > data/feedback. > > > > > > 2. Selector.poll() calls into KafkaChannel.read() to maybe get a > > > NetworkReceive. multiple such read() calls may be required until a > > Receive > > > is produced already in the current code base. my pool implementation is > > > non-blocking so if there's no memory available the read() call will > > return > > > null. poll() would then move on to try and service other selection > keys. > > > the pool will be checked for available memory again the next time the > > > SocketServer.run() loop gets to poll(). and so right now I dont > > communicate > > > memory becoming available to the selector - it will just go on to try > and > > > make progress elsewhere and come back again. i never block it or send > it > > to > > > sleep. I think for efficiency what could maybe be done is if there's > not > > > enough memory to service a readable selection key we may want to skip > all > > > other read-ready selection keys for that iteration of > > pollSelectionKeys(). > > > that would require rather invasive changes around > > > Selector.pollSelectionKeys() that I'd r
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Hi Jun, Yes, youre right - right now the next select() call will return immediately with the same set of keys as earlier (at least) as they were not previously handled (no memory). My assumption is that this happens under considerable load - something has to be occupying all this memory. also, this happens in the context of SocketServer.Processor.run(): while (isRunning) { configureNewConnections() processNewResponses() poll() <-- HERE processCompletedReceives() processCompletedSends() processDisconnected() } even within poll(), things like finishConnection(), prepare(), and write()s can still make progress under low memory conditions. and given the load, there's probably progress to be made in processCompletedReceives(), processCompletedSends() and processDisconnected(). if there's progress to be made in other things its likely that the next call to poll() will not happen immediately and so the loop wont be that tight. in order for this to devolve into true busy waiting you would need a situation where no progress can be made on any in-progress requests and no responses to send out ? if my assumption does not hold then you are correct, and selector.poll(300) currently hardcoded in SocketServer.Processor.poll() would need to be replaced with something more complicated. my biggest point of concern though is that the resulting code would be complicated and would couple Selector to the memory pool very tightly. undey my current patch Selector needs the memory pool only to pass to channels when they are built. this would allow different memory pools relatively easily for things like reserving memory for cross-broker replication and high-SLA connections. a tighter coupling would make any such future modification hard. On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao wrote: > Hi, Radai, > > Thanks for the reply. I still have a followup question on #2. > > My understanding is that in your proposal, selector will now first read the > size of the Receive. If there is not enough memory, it has to turn off the > READ interest bit for the corresponding KafkaChannel. Otherwise, subsequent > selector.poll() call will always return immediately, adding unnecessary > overhead. If you do that, the Selector will need to know when to turn on > the READ interest bit again. It may not be enough to do this check until > the next poll call since the timeout used by poll() could be arbitrarily > large. So, it seems that some kind of coordination between the Selector and > the bufferpool is needed? > > Jun > > On Thu, Sep 8, 2016 at 7:02 PM, radai wrote: > > > Hi Jun, > > > > 1. yes, it is my own personal opinion that people use queued.max.requests > > as an indirect way to bound memory consumption. once a more direct memory > > bound mechanism exists (and works) i dont think queued.max.requests woul > > dbe required. having said that I was not planning on making any changes > > w.r.t queued.max.requests support (so I was aiming to get to a situation > > where both configs are supported) to allow gathering enough > data/feedback. > > > > 2. Selector.poll() calls into KafkaChannel.read() to maybe get a > > NetworkReceive. multiple such read() calls may be required until a > Receive > > is produced already in the current code base. my pool implementation is > > non-blocking so if there's no memory available the read() call will > return > > null. poll() would then move on to try and service other selection keys. > > the pool will be checked for available memory again the next time the > > SocketServer.run() loop gets to poll(). and so right now I dont > communicate > > memory becoming available to the selector - it will just go on to try and > > make progress elsewhere and come back again. i never block it or send it > to > > sleep. I think for efficiency what could maybe be done is if there's not > > enough memory to service a readable selection key we may want to skip all > > other read-ready selection keys for that iteration of > pollSelectionKeys(). > > that would require rather invasive changes around > > Selector.pollSelectionKeys() that I'd rather avoid. also different > > KafkaChannels may be backed by different memory pool (under some sort of > > future QoS scheme?), which would complicate such an optimization further. > > > > 3. i added the pool interface and implementation under > kafka.common.memory, > > and the API is "thin" enough to be generally useful (currently its > > non-blocking only, but a get(long maxWait) is definitely doable). having > > said that, I'm not really familiar enough with the code to say > > > > > > > > On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao wrote: > > > > > Hi, Radi, > > > > > > Thanks for the update. At the high level, this looks promising. A few > > > comments below. > > > > > > 1. If we can bound the requests by bytes, it seems that we don't need > > > queued.max.requests > > > any more? Could we just deprecate the config and make the queue size > > > unbounded?
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Hi, Radai, Thanks for the reply. I still have a followup question on #2. My understanding is that in your proposal, selector will now first read the size of the Receive. If there is not enough memory, it has to turn off the READ interest bit for the corresponding KafkaChannel. Otherwise, subsequent selector.poll() call will always return immediately, adding unnecessary overhead. If you do that, the Selector will need to know when to turn on the READ interest bit again. It may not be enough to do this check until the next poll call since the timeout used by poll() could be arbitrarily large. So, it seems that some kind of coordination between the Selector and the bufferpool is needed? Jun On Thu, Sep 8, 2016 at 7:02 PM, radai wrote: > Hi Jun, > > 1. yes, it is my own personal opinion that people use queued.max.requests > as an indirect way to bound memory consumption. once a more direct memory > bound mechanism exists (and works) i dont think queued.max.requests woul > dbe required. having said that I was not planning on making any changes > w.r.t queued.max.requests support (so I was aiming to get to a situation > where both configs are supported) to allow gathering enough data/feedback. > > 2. Selector.poll() calls into KafkaChannel.read() to maybe get a > NetworkReceive. multiple such read() calls may be required until a Receive > is produced already in the current code base. my pool implementation is > non-blocking so if there's no memory available the read() call will return > null. poll() would then move on to try and service other selection keys. > the pool will be checked for available memory again the next time the > SocketServer.run() loop gets to poll(). and so right now I dont communicate > memory becoming available to the selector - it will just go on to try and > make progress elsewhere and come back again. i never block it or send it to > sleep. I think for efficiency what could maybe be done is if there's not > enough memory to service a readable selection key we may want to skip all > other read-ready selection keys for that iteration of pollSelectionKeys(). > that would require rather invasive changes around > Selector.pollSelectionKeys() that I'd rather avoid. also different > KafkaChannels may be backed by different memory pool (under some sort of > future QoS scheme?), which would complicate such an optimization further. > > 3. i added the pool interface and implementation under kafka.common.memory, > and the API is "thin" enough to be generally useful (currently its > non-blocking only, but a get(long maxWait) is definitely doable). having > said that, I'm not really familiar enough with the code to say > > > > On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao wrote: > > > Hi, Radi, > > > > Thanks for the update. At the high level, this looks promising. A few > > comments below. > > > > 1. If we can bound the requests by bytes, it seems that we don't need > > queued.max.requests > > any more? Could we just deprecate the config and make the queue size > > unbounded? > > 2. How do we communicate back to the selector when some memory is freed > up? > > We probably need to wake up the selector. For efficiency, perhaps we only > > need to wake up the selector if the bufferpool is full? > > 3. We talked about bounding the consumer's memory before. To fully > support > > that, we will need to bound the memory used by different fetch responses > in > > the consumer. Do you think the changes that you propose here can be > > leveraged to bound the memory in the consumer as well? > > > > Jun > > > > > > On Tue, Aug 30, 2016 at 10:41 AM, radai > > wrote: > > > > > My apologies for the delay in response. > > > > > > I agree with the concerns about OOM reading from the actual sockets and > > > blocking the network threads - messing with the request queue itself > > would > > > not do. > > > > > > I propose instead a memory pool approach - the broker would have a non > > > blocking memory pool. upon reading the first 4 bytes out of a socket an > > > attempt would be made to acquire enough memory and if that attempt > fails > > > the processing thread will move on to try and make progress with other > > > tasks. > > > > > > I think Its simpler than mute/unmute because using mute/unmute would > > > require differentiating between sockets muted due to a request in > > progress > > > (normal current operation) and sockets muted due to lack of memory. > > sockets > > > of the 1st kind would be unmuted at the end of request processing (as > it > > > happens right now) but the 2nd kind would require some sort of "unmute > > > watchdog" which is (i claim) more complicated than a memory pool. also > a > > > memory pool is a more generic solution. > > > > > > I've updated the KIP page ( > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests) > > > to reflect the new proposed implementation, and i've also put up an > > inital > >
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Hi Jun, 1. yes, it is my own personal opinion that people use queued.max.requests as an indirect way to bound memory consumption. once a more direct memory bound mechanism exists (and works) i dont think queued.max.requests woul dbe required. having said that I was not planning on making any changes w.r.t queued.max.requests support (so I was aiming to get to a situation where both configs are supported) to allow gathering enough data/feedback. 2. Selector.poll() calls into KafkaChannel.read() to maybe get a NetworkReceive. multiple such read() calls may be required until a Receive is produced already in the current code base. my pool implementation is non-blocking so if there's no memory available the read() call will return null. poll() would then move on to try and service other selection keys. the pool will be checked for available memory again the next time the SocketServer.run() loop gets to poll(). and so right now I dont communicate memory becoming available to the selector - it will just go on to try and make progress elsewhere and come back again. i never block it or send it to sleep. I think for efficiency what could maybe be done is if there's not enough memory to service a readable selection key we may want to skip all other read-ready selection keys for that iteration of pollSelectionKeys(). that would require rather invasive changes around Selector.pollSelectionKeys() that I'd rather avoid. also different KafkaChannels may be backed by different memory pool (under some sort of future QoS scheme?), which would complicate such an optimization further. 3. i added the pool interface and implementation under kafka.common.memory, and the API is "thin" enough to be generally useful (currently its non-blocking only, but a get(long maxWait) is definitely doable). having said that, I'm not really familiar enough with the code to say On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao wrote: > Hi, Radi, > > Thanks for the update. At the high level, this looks promising. A few > comments below. > > 1. If we can bound the requests by bytes, it seems that we don't need > queued.max.requests > any more? Could we just deprecate the config and make the queue size > unbounded? > 2. How do we communicate back to the selector when some memory is freed up? > We probably need to wake up the selector. For efficiency, perhaps we only > need to wake up the selector if the bufferpool is full? > 3. We talked about bounding the consumer's memory before. To fully support > that, we will need to bound the memory used by different fetch responses in > the consumer. Do you think the changes that you propose here can be > leveraged to bound the memory in the consumer as well? > > Jun > > > On Tue, Aug 30, 2016 at 10:41 AM, radai > wrote: > > > My apologies for the delay in response. > > > > I agree with the concerns about OOM reading from the actual sockets and > > blocking the network threads - messing with the request queue itself > would > > not do. > > > > I propose instead a memory pool approach - the broker would have a non > > blocking memory pool. upon reading the first 4 bytes out of a socket an > > attempt would be made to acquire enough memory and if that attempt fails > > the processing thread will move on to try and make progress with other > > tasks. > > > > I think Its simpler than mute/unmute because using mute/unmute would > > require differentiating between sockets muted due to a request in > progress > > (normal current operation) and sockets muted due to lack of memory. > sockets > > of the 1st kind would be unmuted at the end of request processing (as it > > happens right now) but the 2nd kind would require some sort of "unmute > > watchdog" which is (i claim) more complicated than a memory pool. also a > > memory pool is a more generic solution. > > > > I've updated the KIP page ( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests) > > to reflect the new proposed implementation, and i've also put up an > inital > > implementation proposal on github - > > https://github.com/radai-rosenblatt/kafka/commits/broker-memory-pool. > the > > proposed code is not complete and tested yet (so probably buggy) but does > > include the main points of modification. > > > > the specific implementation of the pool on that branch also has a built > in > > safety net where memory that is acquired but not released (which is a > bug) > > is discovered when the garbage collector frees it and the capacity is > > reclaimed. > > > > On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao wrote: > > > > > Radi, > > > > > > Yes, I got the benefit of bounding the request queue by bytes. My > concern > > > is the following if we don't change the behavior of processor blocking > on > > > queue full. > > > > > > If the broker truly doesn't have enough memory for buffering > outstanding > > > requests from all connections, we have to either hit OOM or block the > > > pr
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Hi, Radi, Thanks for the update. At the high level, this looks promising. A few comments below. 1. If we can bound the requests by bytes, it seems that we don't need queued.max.requests any more? Could we just deprecate the config and make the queue size unbounded? 2. How do we communicate back to the selector when some memory is freed up? We probably need to wake up the selector. For efficiency, perhaps we only need to wake up the selector if the bufferpool is full? 3. We talked about bounding the consumer's memory before. To fully support that, we will need to bound the memory used by different fetch responses in the consumer. Do you think the changes that you propose here can be leveraged to bound the memory in the consumer as well? Jun On Tue, Aug 30, 2016 at 10:41 AM, radai wrote: > My apologies for the delay in response. > > I agree with the concerns about OOM reading from the actual sockets and > blocking the network threads - messing with the request queue itself would > not do. > > I propose instead a memory pool approach - the broker would have a non > blocking memory pool. upon reading the first 4 bytes out of a socket an > attempt would be made to acquire enough memory and if that attempt fails > the processing thread will move on to try and make progress with other > tasks. > > I think Its simpler than mute/unmute because using mute/unmute would > require differentiating between sockets muted due to a request in progress > (normal current operation) and sockets muted due to lack of memory. sockets > of the 1st kind would be unmuted at the end of request processing (as it > happens right now) but the 2nd kind would require some sort of "unmute > watchdog" which is (i claim) more complicated than a memory pool. also a > memory pool is a more generic solution. > > I've updated the KIP page ( > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests) > to reflect the new proposed implementation, and i've also put up an inital > implementation proposal on github - > https://github.com/radai-rosenblatt/kafka/commits/broker-memory-pool. the > proposed code is not complete and tested yet (so probably buggy) but does > include the main points of modification. > > the specific implementation of the pool on that branch also has a built in > safety net where memory that is acquired but not released (which is a bug) > is discovered when the garbage collector frees it and the capacity is > reclaimed. > > On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao wrote: > > > Radi, > > > > Yes, I got the benefit of bounding the request queue by bytes. My concern > > is the following if we don't change the behavior of processor blocking on > > queue full. > > > > If the broker truly doesn't have enough memory for buffering outstanding > > requests from all connections, we have to either hit OOM or block the > > processor. Both will be bad. I am not sure if one is clearly better than > > the other. In this case, the solution is probably to expand the cluster > to > > reduce the per broker request load. > > > > If the broker actually has enough memory, we want to be able to configure > > the request queue in such a way that it never blocks. You can tell people > > to just set the request queue to be unbounded, which may scare them. If > we > > do want to put a bound, it seems it's easier to configure the queue size > > based on # requests. Basically, we can tell people to set the queue size > > based on number of connections. If the queue is based on bytes, it's not > > clear how people should set it w/o causing the processor to block. > > > > Finally, Rajini has a good point. The ByteBuffer in the request object is > > allocated as soon as we see the first 4 bytes from the socket. So, I am > not > > sure if just bounding the request queue itself is enough to bound the > > memory related to requests. > > > > Thanks, > > > > Jun > > > > > > > > On Mon, Aug 8, 2016 at 4:46 PM, radai > wrote: > > > > > I agree that filling up the request queue can cause clients to time out > > > (and presumably retry?). However, for the workloads where we expect > this > > > configuration to be useful the alternative is currently an OOM crash. > > > In my opinion an initial implementation of this feature could be > > > constrained to a simple drop-in replacement of ArrayBlockingQueue > > > (conditional, opt-in) and further study of behavior patterns under load > > can > > > drive future changes to the API later when those behaviors are better > > > understood (like back-pressure, nop filler responses to avoid client > > > timeouts or whatever). > > > > > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat < > > > gharatmayures...@gmail.com> > > > wrote: > > > > > > > Nice write up Radai. > > > > I think what Jun said is a valid concern. > > > > If I am not wrong as per the proposal, we are depending on the entire > > > > pipeline to flow smoothly from accepting requests to h
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
My apologies for the delay in response. I agree with the concerns about OOM reading from the actual sockets and blocking the network threads - messing with the request queue itself would not do. I propose instead a memory pool approach - the broker would have a non blocking memory pool. upon reading the first 4 bytes out of a socket an attempt would be made to acquire enough memory and if that attempt fails the processing thread will move on to try and make progress with other tasks. I think Its simpler than mute/unmute because using mute/unmute would require differentiating between sockets muted due to a request in progress (normal current operation) and sockets muted due to lack of memory. sockets of the 1st kind would be unmuted at the end of request processing (as it happens right now) but the 2nd kind would require some sort of "unmute watchdog" which is (i claim) more complicated than a memory pool. also a memory pool is a more generic solution. I've updated the KIP page ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests) to reflect the new proposed implementation, and i've also put up an inital implementation proposal on github - https://github.com/radai-rosenblatt/kafka/commits/broker-memory-pool. the proposed code is not complete and tested yet (so probably buggy) but does include the main points of modification. the specific implementation of the pool on that branch also has a built in safety net where memory that is acquired but not released (which is a bug) is discovered when the garbage collector frees it and the capacity is reclaimed. On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao wrote: > Radi, > > Yes, I got the benefit of bounding the request queue by bytes. My concern > is the following if we don't change the behavior of processor blocking on > queue full. > > If the broker truly doesn't have enough memory for buffering outstanding > requests from all connections, we have to either hit OOM or block the > processor. Both will be bad. I am not sure if one is clearly better than > the other. In this case, the solution is probably to expand the cluster to > reduce the per broker request load. > > If the broker actually has enough memory, we want to be able to configure > the request queue in such a way that it never blocks. You can tell people > to just set the request queue to be unbounded, which may scare them. If we > do want to put a bound, it seems it's easier to configure the queue size > based on # requests. Basically, we can tell people to set the queue size > based on number of connections. If the queue is based on bytes, it's not > clear how people should set it w/o causing the processor to block. > > Finally, Rajini has a good point. The ByteBuffer in the request object is > allocated as soon as we see the first 4 bytes from the socket. So, I am not > sure if just bounding the request queue itself is enough to bound the > memory related to requests. > > Thanks, > > Jun > > > > On Mon, Aug 8, 2016 at 4:46 PM, radai wrote: > > > I agree that filling up the request queue can cause clients to time out > > (and presumably retry?). However, for the workloads where we expect this > > configuration to be useful the alternative is currently an OOM crash. > > In my opinion an initial implementation of this feature could be > > constrained to a simple drop-in replacement of ArrayBlockingQueue > > (conditional, opt-in) and further study of behavior patterns under load > can > > drive future changes to the API later when those behaviors are better > > understood (like back-pressure, nop filler responses to avoid client > > timeouts or whatever). > > > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat < > > gharatmayures...@gmail.com> > > wrote: > > > > > Nice write up Radai. > > > I think what Jun said is a valid concern. > > > If I am not wrong as per the proposal, we are depending on the entire > > > pipeline to flow smoothly from accepting requests to handling it, > calling > > > KafkaApis and handing back the responses. > > > > > > Thanks, > > > > > > Mayuresh > > > > > > > > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy > wrote: > > > > > > > > > > > > > . > > > > >> > > > > >> > > > > > Hi Becket, > > > > > > > > > > I don't think progress can be made in the processor's run loop if > the > > > > > queue fills up. i.e., I think Jun's point is that if the queue is > > full > > > > > (either due to the proposed max.bytes or today due to max.requests > > > > hitting > > > > > the limit) then processCompletedReceives will block and no further > > > > progress > > > > > can be made. > > > > > > > > > > > > > I'm sorry - this isn't right. There will be progress as long as the > API > > > > handlers are able to pick requests off the request queue and add the > > > > responses to the response queues (which are effectively unbounded). > > > > However, the point is valid that blocking in the request channel's > put > > > has
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Radi, Yes, I got the benefit of bounding the request queue by bytes. My concern is the following if we don't change the behavior of processor blocking on queue full. If the broker truly doesn't have enough memory for buffering outstanding requests from all connections, we have to either hit OOM or block the processor. Both will be bad. I am not sure if one is clearly better than the other. In this case, the solution is probably to expand the cluster to reduce the per broker request load. If the broker actually has enough memory, we want to be able to configure the request queue in such a way that it never blocks. You can tell people to just set the request queue to be unbounded, which may scare them. If we do want to put a bound, it seems it's easier to configure the queue size based on # requests. Basically, we can tell people to set the queue size based on number of connections. If the queue is based on bytes, it's not clear how people should set it w/o causing the processor to block. Finally, Rajini has a good point. The ByteBuffer in the request object is allocated as soon as we see the first 4 bytes from the socket. So, I am not sure if just bounding the request queue itself is enough to bound the memory related to requests. Thanks, Jun On Mon, Aug 8, 2016 at 4:46 PM, radai wrote: > I agree that filling up the request queue can cause clients to time out > (and presumably retry?). However, for the workloads where we expect this > configuration to be useful the alternative is currently an OOM crash. > In my opinion an initial implementation of this feature could be > constrained to a simple drop-in replacement of ArrayBlockingQueue > (conditional, opt-in) and further study of behavior patterns under load can > drive future changes to the API later when those behaviors are better > understood (like back-pressure, nop filler responses to avoid client > timeouts or whatever). > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat < > gharatmayures...@gmail.com> > wrote: > > > Nice write up Radai. > > I think what Jun said is a valid concern. > > If I am not wrong as per the proposal, we are depending on the entire > > pipeline to flow smoothly from accepting requests to handling it, calling > > KafkaApis and handing back the responses. > > > > Thanks, > > > > Mayuresh > > > > > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy wrote: > > > > > > > > > > . > > > >> > > > >> > > > > Hi Becket, > > > > > > > > I don't think progress can be made in the processor's run loop if the > > > > queue fills up. i.e., I think Jun's point is that if the queue is > full > > > > (either due to the proposed max.bytes or today due to max.requests > > > hitting > > > > the limit) then processCompletedReceives will block and no further > > > progress > > > > can be made. > > > > > > > > > > I'm sorry - this isn't right. There will be progress as long as the API > > > handlers are able to pick requests off the request queue and add the > > > responses to the response queues (which are effectively unbounded). > > > However, the point is valid that blocking in the request channel's put > > has > > > the effect of exacerbating the pressure on the socket server. > > > > > > > > > > > > > >> > > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao wrote: > > > >> > > > >> > Radai, > > > >> > > > > >> > Thanks for the proposal. A couple of comments on this. > > > >> > > > > >> > 1. Since we store request objects in the request queue, how do we > > get > > > an > > > >> > accurate size estimate for those requests? > > > >> > > > > >> > 2. Currently, it's bad if the processor blocks on adding a request > > to > > > >> the > > > >> > request queue. Once blocked, the processor can't process the > sending > > > of > > > >> > responses of other socket keys either. This will cause all clients > > in > > > >> this > > > >> > processor with an outstanding request to eventually timeout. > > > Typically, > > > >> > this will trigger client-side retries, which will add more load on > > the > > > >> > broker and cause potentially more congestion in the request queue. > > > With > > > >> > queued.max.requests, to prevent blocking on the request queue, our > > > >> > recommendation is to configure queued.max.requests to be the same > as > > > the > > > >> > number of socket connections on the broker. Since the broker never > > > >> > processes more than 1 request per connection at a time, the > request > > > >> queue > > > >> > will never be blocked. With queued.max.bytes, it's going to be > > harder > > > to > > > >> > configure the value properly to prevent blocking. > > > >> > > > > >> > So, while adding queued.max.bytes is potentially useful for memory > > > >> > management, for it to be truly useful, we probably need to address > > the > > > >> > processor blocking issue for it to be really useful in practice. > One > > > >> > possibility is to put back-pressure to the client when the request > > > >> queue is > > > >> > blocked. For example,
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
I like the simplicity of the approach and can see that it is an improvement over the current implementation in typical scenarios. But I would like to see Jun's proposal to mute sockets explored further. With the proposal in the KIP to limit queue size, I am not sure how to calculate the total memory requirements for brokers to avoid OOM, particularly to avoid DoS attacks in a secure cluster. Even though the KIP says, the actual memory bound is "queued.max.bytes + socket.request.max.bytes", it seems to me that even with the queue limits, brokers will need close to (socket.request.max.bytes * n) bytes of memory to guarantee they don't OOM (where n is the number of connections). That upper limit is no different to the current implementation. With client quotas that delay responses and client requests that may be retried due to blocking processors, as soon as there is space in the request queue that unblocks the processor, the next select operation could read in a lot of data from a lot of connections. So controlling the amount of data read from the socket could be as important as controlling the request queue size. Don't mind this being done in two stages, but it will be good to understand what the final solution would look like before this one is committed. On Tue, Aug 9, 2016 at 1:58 AM, Becket Qin wrote: > Thought about this again. If I understand correctly Jun's concern is about > the cascading effect. Currently the processor will try to put all the > requests received in one poll() call into the RequestChannel. This could > potentially be long if the queue is moving really really slowly. If we > don't mute the sockets and clients time out then reconnect, the processor > may read more requests from the new sockets and take even longer to put > them into the RequestChannel. So it would be good see if we can avoid this > cascading effects. > > On the other end, allowing the processor to keep reading requests from the > sockets when the RequestChannel is full also seems hurting the memory usage > control effort. > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Aug 8, 2016 at 4:46 PM, radai wrote: > > > I agree that filling up the request queue can cause clients to time out > > (and presumably retry?). However, for the workloads where we expect this > > configuration to be useful the alternative is currently an OOM crash. > > In my opinion an initial implementation of this feature could be > > constrained to a simple drop-in replacement of ArrayBlockingQueue > > (conditional, opt-in) and further study of behavior patterns under load > can > > drive future changes to the API later when those behaviors are better > > understood (like back-pressure, nop filler responses to avoid client > > timeouts or whatever). > > > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat < > > gharatmayures...@gmail.com> > > wrote: > > > > > Nice write up Radai. > > > I think what Jun said is a valid concern. > > > If I am not wrong as per the proposal, we are depending on the entire > > > pipeline to flow smoothly from accepting requests to handling it, > calling > > > KafkaApis and handing back the responses. > > > > > > Thanks, > > > > > > Mayuresh > > > > > > > > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy > wrote: > > > > > > > > > > > > > . > > > > >> > > > > >> > > > > > Hi Becket, > > > > > > > > > > I don't think progress can be made in the processor's run loop if > the > > > > > queue fills up. i.e., I think Jun's point is that if the queue is > > full > > > > > (either due to the proposed max.bytes or today due to max.requests > > > > hitting > > > > > the limit) then processCompletedReceives will block and no further > > > > progress > > > > > can be made. > > > > > > > > > > > > > I'm sorry - this isn't right. There will be progress as long as the > API > > > > handlers are able to pick requests off the request queue and add the > > > > responses to the response queues (which are effectively unbounded). > > > > However, the point is valid that blocking in the request channel's > put > > > has > > > > the effect of exacerbating the pressure on the socket server. > > > > > > > > > > > > > > > > > >> > > > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao > wrote: > > > > >> > > > > >> > Radai, > > > > >> > > > > > >> > Thanks for the proposal. A couple of comments on this. > > > > >> > > > > > >> > 1. Since we store request objects in the request queue, how do > we > > > get > > > > an > > > > >> > accurate size estimate for those requests? > > > > >> > > > > > >> > 2. Currently, it's bad if the processor blocks on adding a > request > > > to > > > > >> the > > > > >> > request queue. Once blocked, the processor can't process the > > sending > > > > of > > > > >> > responses of other socket keys either. This will cause all > clients > > > in > > > > >> this > > > > >> > processor with an outstanding request to eventually timeout. > > > > Typically, > > > > >> > this will trigger client-side retries, which will add
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Thought about this again. If I understand correctly Jun's concern is about the cascading effect. Currently the processor will try to put all the requests received in one poll() call into the RequestChannel. This could potentially be long if the queue is moving really really slowly. If we don't mute the sockets and clients time out then reconnect, the processor may read more requests from the new sockets and take even longer to put them into the RequestChannel. So it would be good see if we can avoid this cascading effects. On the other end, allowing the processor to keep reading requests from the sockets when the RequestChannel is full also seems hurting the memory usage control effort. Thanks, Jiangjie (Becket) Qin On Mon, Aug 8, 2016 at 4:46 PM, radai wrote: > I agree that filling up the request queue can cause clients to time out > (and presumably retry?). However, for the workloads where we expect this > configuration to be useful the alternative is currently an OOM crash. > In my opinion an initial implementation of this feature could be > constrained to a simple drop-in replacement of ArrayBlockingQueue > (conditional, opt-in) and further study of behavior patterns under load can > drive future changes to the API later when those behaviors are better > understood (like back-pressure, nop filler responses to avoid client > timeouts or whatever). > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat < > gharatmayures...@gmail.com> > wrote: > > > Nice write up Radai. > > I think what Jun said is a valid concern. > > If I am not wrong as per the proposal, we are depending on the entire > > pipeline to flow smoothly from accepting requests to handling it, calling > > KafkaApis and handing back the responses. > > > > Thanks, > > > > Mayuresh > > > > > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy wrote: > > > > > > > > > > . > > > >> > > > >> > > > > Hi Becket, > > > > > > > > I don't think progress can be made in the processor's run loop if the > > > > queue fills up. i.e., I think Jun's point is that if the queue is > full > > > > (either due to the proposed max.bytes or today due to max.requests > > > hitting > > > > the limit) then processCompletedReceives will block and no further > > > progress > > > > can be made. > > > > > > > > > > I'm sorry - this isn't right. There will be progress as long as the API > > > handlers are able to pick requests off the request queue and add the > > > responses to the response queues (which are effectively unbounded). > > > However, the point is valid that blocking in the request channel's put > > has > > > the effect of exacerbating the pressure on the socket server. > > > > > > > > > > > > > >> > > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao wrote: > > > >> > > > >> > Radai, > > > >> > > > > >> > Thanks for the proposal. A couple of comments on this. > > > >> > > > > >> > 1. Since we store request objects in the request queue, how do we > > get > > > an > > > >> > accurate size estimate for those requests? > > > >> > > > > >> > 2. Currently, it's bad if the processor blocks on adding a request > > to > > > >> the > > > >> > request queue. Once blocked, the processor can't process the > sending > > > of > > > >> > responses of other socket keys either. This will cause all clients > > in > > > >> this > > > >> > processor with an outstanding request to eventually timeout. > > > Typically, > > > >> > this will trigger client-side retries, which will add more load on > > the > > > >> > broker and cause potentially more congestion in the request queue. > > > With > > > >> > queued.max.requests, to prevent blocking on the request queue, our > > > >> > recommendation is to configure queued.max.requests to be the same > as > > > the > > > >> > number of socket connections on the broker. Since the broker never > > > >> > processes more than 1 request per connection at a time, the > request > > > >> queue > > > >> > will never be blocked. With queued.max.bytes, it's going to be > > harder > > > to > > > >> > configure the value properly to prevent blocking. > > > >> > > > > >> > So, while adding queued.max.bytes is potentially useful for memory > > > >> > management, for it to be truly useful, we probably need to address > > the > > > >> > processor blocking issue for it to be really useful in practice. > One > > > >> > possibility is to put back-pressure to the client when the request > > > >> queue is > > > >> > blocked. For example, if the processor notices that the request > > queue > > > is > > > >> > full, it can turn off the interest bit for read for all socket > keys. > > > >> This > > > >> > will allow the processor to continue handling responses. When the > > > >> request > > > >> > queue has space again, it can indicate the new state to the > process > > > and > > > >> > wake up the selector. Not sure how this will work with multiple > > > >> processors > > > >> > though since the request queue is shared across all processors. > > > >> > > > > >> > Thanks,
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
I agree that filling up the request queue can cause clients to time out (and presumably retry?). However, for the workloads where we expect this configuration to be useful the alternative is currently an OOM crash. In my opinion an initial implementation of this feature could be constrained to a simple drop-in replacement of ArrayBlockingQueue (conditional, opt-in) and further study of behavior patterns under load can drive future changes to the API later when those behaviors are better understood (like back-pressure, nop filler responses to avoid client timeouts or whatever). On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat wrote: > Nice write up Radai. > I think what Jun said is a valid concern. > If I am not wrong as per the proposal, we are depending on the entire > pipeline to flow smoothly from accepting requests to handling it, calling > KafkaApis and handing back the responses. > > Thanks, > > Mayuresh > > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy wrote: > > > > > > > . > > >> > > >> > > > Hi Becket, > > > > > > I don't think progress can be made in the processor's run loop if the > > > queue fills up. i.e., I think Jun's point is that if the queue is full > > > (either due to the proposed max.bytes or today due to max.requests > > hitting > > > the limit) then processCompletedReceives will block and no further > > progress > > > can be made. > > > > > > > I'm sorry - this isn't right. There will be progress as long as the API > > handlers are able to pick requests off the request queue and add the > > responses to the response queues (which are effectively unbounded). > > However, the point is valid that blocking in the request channel's put > has > > the effect of exacerbating the pressure on the socket server. > > > > > > > > > >> > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao wrote: > > >> > > >> > Radai, > > >> > > > >> > Thanks for the proposal. A couple of comments on this. > > >> > > > >> > 1. Since we store request objects in the request queue, how do we > get > > an > > >> > accurate size estimate for those requests? > > >> > > > >> > 2. Currently, it's bad if the processor blocks on adding a request > to > > >> the > > >> > request queue. Once blocked, the processor can't process the sending > > of > > >> > responses of other socket keys either. This will cause all clients > in > > >> this > > >> > processor with an outstanding request to eventually timeout. > > Typically, > > >> > this will trigger client-side retries, which will add more load on > the > > >> > broker and cause potentially more congestion in the request queue. > > With > > >> > queued.max.requests, to prevent blocking on the request queue, our > > >> > recommendation is to configure queued.max.requests to be the same as > > the > > >> > number of socket connections on the broker. Since the broker never > > >> > processes more than 1 request per connection at a time, the request > > >> queue > > >> > will never be blocked. With queued.max.bytes, it's going to be > harder > > to > > >> > configure the value properly to prevent blocking. > > >> > > > >> > So, while adding queued.max.bytes is potentially useful for memory > > >> > management, for it to be truly useful, we probably need to address > the > > >> > processor blocking issue for it to be really useful in practice. One > > >> > possibility is to put back-pressure to the client when the request > > >> queue is > > >> > blocked. For example, if the processor notices that the request > queue > > is > > >> > full, it can turn off the interest bit for read for all socket keys. > > >> This > > >> > will allow the processor to continue handling responses. When the > > >> request > > >> > queue has space again, it can indicate the new state to the process > > and > > >> > wake up the selector. Not sure how this will work with multiple > > >> processors > > >> > though since the request queue is shared across all processors. > > >> > > > >> > Thanks, > > >> > > > >> > Jun > > >> > > > >> > > > >> > > > >> > On Thu, Aug 4, 2016 at 11:28 AM, radai > > >> wrote: > > >> > > > >> > > Hello, > > >> > > > > >> > > I'd like to initiate a discussion about > > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >> > > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes > > >> > > > > >> > > The goal of the KIP is to allow configuring a bound on the > capacity > > >> (as > > >> > in > > >> > > bytes of memory used) of the incoming request queue, in addition > to > > >> the > > >> > > current bound on the number of messages. > > >> > > > > >> > > This comes after several incidents at Linkedin where a sudden > > "spike" > > >> of > > >> > > large message batches caused an out of memory exception. > > >> > > > > >> > > Thank you, > > >> > > > > >> > >Radai > > >> > > > > >> > > > >> > > > > > > > > > > > > -- > -Regards, > Mayuresh R. Gharat > (862) 250-7125 >
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Nice write up Radai. I think what Jun said is a valid concern. If I am not wrong as per the proposal, we are depending on the entire pipeline to flow smoothly from accepting requests to handling it, calling KafkaApis and handing back the responses. Thanks, Mayuresh On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy wrote: > > > > . > >> > >> > > Hi Becket, > > > > I don't think progress can be made in the processor's run loop if the > > queue fills up. i.e., I think Jun's point is that if the queue is full > > (either due to the proposed max.bytes or today due to max.requests > hitting > > the limit) then processCompletedReceives will block and no further > progress > > can be made. > > > > I'm sorry - this isn't right. There will be progress as long as the API > handlers are able to pick requests off the request queue and add the > responses to the response queues (which are effectively unbounded). > However, the point is valid that blocking in the request channel's put has > the effect of exacerbating the pressure on the socket server. > > > > > >> > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao wrote: > >> > >> > Radai, > >> > > >> > Thanks for the proposal. A couple of comments on this. > >> > > >> > 1. Since we store request objects in the request queue, how do we get > an > >> > accurate size estimate for those requests? > >> > > >> > 2. Currently, it's bad if the processor blocks on adding a request to > >> the > >> > request queue. Once blocked, the processor can't process the sending > of > >> > responses of other socket keys either. This will cause all clients in > >> this > >> > processor with an outstanding request to eventually timeout. > Typically, > >> > this will trigger client-side retries, which will add more load on the > >> > broker and cause potentially more congestion in the request queue. > With > >> > queued.max.requests, to prevent blocking on the request queue, our > >> > recommendation is to configure queued.max.requests to be the same as > the > >> > number of socket connections on the broker. Since the broker never > >> > processes more than 1 request per connection at a time, the request > >> queue > >> > will never be blocked. With queued.max.bytes, it's going to be harder > to > >> > configure the value properly to prevent blocking. > >> > > >> > So, while adding queued.max.bytes is potentially useful for memory > >> > management, for it to be truly useful, we probably need to address the > >> > processor blocking issue for it to be really useful in practice. One > >> > possibility is to put back-pressure to the client when the request > >> queue is > >> > blocked. For example, if the processor notices that the request queue > is > >> > full, it can turn off the interest bit for read for all socket keys. > >> This > >> > will allow the processor to continue handling responses. When the > >> request > >> > queue has space again, it can indicate the new state to the process > and > >> > wake up the selector. Not sure how this will work with multiple > >> processors > >> > though since the request queue is shared across all processors. > >> > > >> > Thanks, > >> > > >> > Jun > >> > > >> > > >> > > >> > On Thu, Aug 4, 2016 at 11:28 AM, radai > >> wrote: > >> > > >> > > Hello, > >> > > > >> > > I'd like to initiate a discussion about > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> > > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes > >> > > > >> > > The goal of the KIP is to allow configuring a bound on the capacity > >> (as > >> > in > >> > > bytes of memory used) of the incoming request queue, in addition to > >> the > >> > > current bound on the number of messages. > >> > > > >> > > This comes after several incidents at Linkedin where a sudden > "spike" > >> of > >> > > large message batches caused an out of memory exception. > >> > > > >> > > Thank you, > >> > > > >> > >Radai > >> > > > >> > > >> > > > > > -- -Regards, Mayuresh R. Gharat (862) 250-7125
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
> > . >> >> > Hi Becket, > > I don't think progress can be made in the processor's run loop if the > queue fills up. i.e., I think Jun's point is that if the queue is full > (either due to the proposed max.bytes or today due to max.requests hitting > the limit) then processCompletedReceives will block and no further progress > can be made. > I'm sorry - this isn't right. There will be progress as long as the API handlers are able to pick requests off the request queue and add the responses to the response queues (which are effectively unbounded). However, the point is valid that blocking in the request channel's put has the effect of exacerbating the pressure on the socket server. > >> >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao wrote: >> >> > Radai, >> > >> > Thanks for the proposal. A couple of comments on this. >> > >> > 1. Since we store request objects in the request queue, how do we get an >> > accurate size estimate for those requests? >> > >> > 2. Currently, it's bad if the processor blocks on adding a request to >> the >> > request queue. Once blocked, the processor can't process the sending of >> > responses of other socket keys either. This will cause all clients in >> this >> > processor with an outstanding request to eventually timeout. Typically, >> > this will trigger client-side retries, which will add more load on the >> > broker and cause potentially more congestion in the request queue. With >> > queued.max.requests, to prevent blocking on the request queue, our >> > recommendation is to configure queued.max.requests to be the same as the >> > number of socket connections on the broker. Since the broker never >> > processes more than 1 request per connection at a time, the request >> queue >> > will never be blocked. With queued.max.bytes, it's going to be harder to >> > configure the value properly to prevent blocking. >> > >> > So, while adding queued.max.bytes is potentially useful for memory >> > management, for it to be truly useful, we probably need to address the >> > processor blocking issue for it to be really useful in practice. One >> > possibility is to put back-pressure to the client when the request >> queue is >> > blocked. For example, if the processor notices that the request queue is >> > full, it can turn off the interest bit for read for all socket keys. >> This >> > will allow the processor to continue handling responses. When the >> request >> > queue has space again, it can indicate the new state to the process and >> > wake up the selector. Not sure how this will work with multiple >> processors >> > though since the request queue is shared across all processors. >> > >> > Thanks, >> > >> > Jun >> > >> > >> > >> > On Thu, Aug 4, 2016 at 11:28 AM, radai >> wrote: >> > >> > > Hello, >> > > >> > > I'd like to initiate a discussion about >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes >> > > >> > > The goal of the KIP is to allow configuring a bound on the capacity >> (as >> > in >> > > bytes of memory used) of the incoming request queue, in addition to >> the >> > > current bound on the number of messages. >> > > >> > > This comes after several incidents at Linkedin where a sudden "spike" >> of >> > > large message batches caused an out of memory exception. >> > > >> > > Thank you, >> > > >> > >Radai >> > > >> > >> > >
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
> > > > 2. Good point about the consequence when the processor threads are > blocking. I agree it would be important to keep the processor thread > running, but I am not sure if it would be a problem of the current > proposal. In most of the time, the request queue should be close to empty, > so the processor won't block. In cases where the queue is throttled because > of the bytes limit, the processor thread should only slow down but not > block for long, i.e. they will be able to make progress as fast as the > request handler threads. At that point, the broker is already processing > the requests at maximum speed. If the clients experiences timeout, the same > thing will likely happen even without the byte limit or we disable READ > from the sockets. The only difference is that the broker won't have OOM if > we have the bytes limit. > > Hi Becket, I don't think progress can be made in the processor's run loop if the queue fills up. i.e., I think Jun's point is that if the queue is full (either due to the proposed max.bytes or today due to max.requests hitting the limit) then processCompletedReceives will block and no further progress can be made. Joel > > On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao wrote: > > > Radai, > > > > Thanks for the proposal. A couple of comments on this. > > > > 1. Since we store request objects in the request queue, how do we get an > > accurate size estimate for those requests? > > > > 2. Currently, it's bad if the processor blocks on adding a request to the > > request queue. Once blocked, the processor can't process the sending of > > responses of other socket keys either. This will cause all clients in > this > > processor with an outstanding request to eventually timeout. Typically, > > this will trigger client-side retries, which will add more load on the > > broker and cause potentially more congestion in the request queue. With > > queued.max.requests, to prevent blocking on the request queue, our > > recommendation is to configure queued.max.requests to be the same as the > > number of socket connections on the broker. Since the broker never > > processes more than 1 request per connection at a time, the request queue > > will never be blocked. With queued.max.bytes, it's going to be harder to > > configure the value properly to prevent blocking. > > > > So, while adding queued.max.bytes is potentially useful for memory > > management, for it to be truly useful, we probably need to address the > > processor blocking issue for it to be really useful in practice. One > > possibility is to put back-pressure to the client when the request queue > is > > blocked. For example, if the processor notices that the request queue is > > full, it can turn off the interest bit for read for all socket keys. This > > will allow the processor to continue handling responses. When the request > > queue has space again, it can indicate the new state to the process and > > wake up the selector. Not sure how this will work with multiple > processors > > though since the request queue is shared across all processors. > > > > Thanks, > > > > Jun > > > > > > > > On Thu, Aug 4, 2016 at 11:28 AM, radai > wrote: > > > > > Hello, > > > > > > I'd like to initiate a discussion about > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes > > > > > > The goal of the KIP is to allow configuring a bound on the capacity (as > > in > > > bytes of memory used) of the incoming request queue, in addition to the > > > current bound on the number of messages. > > > > > > This comes after several incidents at Linkedin where a sudden "spike" > of > > > large message batches caused an out of memory exception. > > > > > > Thank you, > > > > > >Radai > > > > > >
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Hi Jun, 1. The requests in the queue are RequestChannel.Request. It contains the raw ByteBuffer from the socket, we can probably just use that. This might not be 100% accurate about the memory taken, but probably is good enough. 2. Good point about the consequence when the processor threads are blocking. I agree it would be important to keep the processor thread running, but I am not sure if it would be a problem of the current proposal. In most of the time, the request queue should be close to empty, so the processor won't block. In cases where the queue is throttled because of the bytes limit, the processor thread should only slow down but not block for long, i.e. they will be able to make progress as fast as the request handler threads. At that point, the broker is already processing the requests at maximum speed. If the clients experiences timeout, the same thing will likely happen even without the byte limit or we disable READ from the sockets. The only difference is that the broker won't have OOM if we have the bytes limit. Thanks, Jiangjie (Becket) Qin On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao wrote: > Radai, > > Thanks for the proposal. A couple of comments on this. > > 1. Since we store request objects in the request queue, how do we get an > accurate size estimate for those requests? > > 2. Currently, it's bad if the processor blocks on adding a request to the > request queue. Once blocked, the processor can't process the sending of > responses of other socket keys either. This will cause all clients in this > processor with an outstanding request to eventually timeout. Typically, > this will trigger client-side retries, which will add more load on the > broker and cause potentially more congestion in the request queue. With > queued.max.requests, to prevent blocking on the request queue, our > recommendation is to configure queued.max.requests to be the same as the > number of socket connections on the broker. Since the broker never > processes more than 1 request per connection at a time, the request queue > will never be blocked. With queued.max.bytes, it's going to be harder to > configure the value properly to prevent blocking. > > So, while adding queued.max.bytes is potentially useful for memory > management, for it to be truly useful, we probably need to address the > processor blocking issue for it to be really useful in practice. One > possibility is to put back-pressure to the client when the request queue is > blocked. For example, if the processor notices that the request queue is > full, it can turn off the interest bit for read for all socket keys. This > will allow the processor to continue handling responses. When the request > queue has space again, it can indicate the new state to the process and > wake up the selector. Not sure how this will work with multiple processors > though since the request queue is shared across all processors. > > Thanks, > > Jun > > > > On Thu, Aug 4, 2016 at 11:28 AM, radai wrote: > > > Hello, > > > > I'd like to initiate a discussion about > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes > > > > The goal of the KIP is to allow configuring a bound on the capacity (as > in > > bytes of memory used) of the incoming request queue, in addition to the > > current bound on the number of messages. > > > > This comes after several incidents at Linkedin where a sudden "spike" of > > large message batches caused an out of memory exception. > > > > Thank you, > > > >Radai > > >
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Radai, Thanks for the proposal. A couple of comments on this. 1. Since we store request objects in the request queue, how do we get an accurate size estimate for those requests? 2. Currently, it's bad if the processor blocks on adding a request to the request queue. Once blocked, the processor can't process the sending of responses of other socket keys either. This will cause all clients in this processor with an outstanding request to eventually timeout. Typically, this will trigger client-side retries, which will add more load on the broker and cause potentially more congestion in the request queue. With queued.max.requests, to prevent blocking on the request queue, our recommendation is to configure queued.max.requests to be the same as the number of socket connections on the broker. Since the broker never processes more than 1 request per connection at a time, the request queue will never be blocked. With queued.max.bytes, it's going to be harder to configure the value properly to prevent blocking. So, while adding queued.max.bytes is potentially useful for memory management, for it to be truly useful, we probably need to address the processor blocking issue for it to be really useful in practice. One possibility is to put back-pressure to the client when the request queue is blocked. For example, if the processor notices that the request queue is full, it can turn off the interest bit for read for all socket keys. This will allow the processor to continue handling responses. When the request queue has space again, it can indicate the new state to the process and wake up the selector. Not sure how this will work with multiple processors though since the request queue is shared across all processors. Thanks, Jun On Thu, Aug 4, 2016 at 11:28 AM, radai wrote: > Hello, > > I'd like to initiate a discussion about > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes > > The goal of the KIP is to allow configuring a bound on the capacity (as in > bytes of memory used) of the incoming request queue, in addition to the > current bound on the number of messages. > > This comes after several incidents at Linkedin where a sudden "spike" of > large message batches caused an out of memory exception. > > Thank you, > >Radai >
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Supporting both configs at the same time to begin with would allow enough time for users to experiment with the settings. If in the future its deemed that a memory bound is far superior to a #req bound (which is my opinion but needs evidence to support) the migration discussion could be deferred to that point, making immediate impact tiny (conf upgrade is trivial, behavior remains backwards compatible, and new feature is "opt-in"). I will extend the Unit tests (and fix the bugs) as part of the implementation. As for the performance implications: 1. my initial runs (https://github.com/radai-rosenblatt/kafka-benchmarks) show ~5Mops/sec using the current queue (ArrayBlockingQueueBenchmark) and ~400Kops/sec using the proposed queue datastructure (ByteBoundedBlockingQueueBenchmark). this is using 16 producer threads and 24 consumer threads on my machine. 2. ByteBoundedBlockingQueue uses composition on top of ArrayBlockingQueue, since ArrayBlockingQueue is not easily extendable. this leads to the double locking you mentioned. an alternative would be to basically copy-paste ArrayBlockingQueue and add the functionality (as done in https://github.com/radai-rosenblatt/kafka-benchmarks/blob/master/src/main/java/net/radai/kafka/MetricBoundedArrayBlockingQueue.java). this shown no noticeable impact on queue performance (also covered in my benchmark code) but is far less elegant. 3. 400Kops/sec is still more than enough for realistic kafka installations, and the effect can be further mitigated by using ByteBoundedBlockingQueue only if the configuration was set (so no impact if the feature is "opt in" on default installations) 4. As part of the implementation I will also deploy the proposed code in a test env here (linkedin) and report back on measured throughput vs a vanilla version On Fri, Aug 5, 2016 at 8:38 AM, Ismael Juma wrote: > Hi Radai, > > Thanks for the proposal. I think it makes sense to be able to limit the > request queue by bytes. I haven't made up my mind on whether having both > limits is better than having a single one yet, but we probably need to make > a call on that before we can start a vote. > > Just a quick point with regards to ByteBoundedBlockingQueue. It's not > currently used in the code and the existing unit tests are very basic. > Also, looking at the implementation, it has two obvious bugs (it fails if > one passes None for `sizeFunction` and `currentByteSize` is not final and > it should be to comply with the JMM). So, we probably need to extend the > unit tests for that class if we want to use it (ideally there would be > multi-threaded tests too). It also is a bit inefficient as its adds > additional locks on top of the LinkedBlockingQueue ones, but the benchmarks > should hopefully show if that is something that needs to be addressed or > not. > > Ismael > > On Thu, Aug 4, 2016 at 7:28 PM, radai wrote: > > > Hello, > > > > I'd like to initiate a discussion about > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes > > > > The goal of the KIP is to allow configuring a bound on the capacity (as > in > > bytes of memory used) of the incoming request queue, in addition to the > > current bound on the number of messages. > > > > This comes after several incidents at Linkedin where a sudden "spike" of > > large message batches caused an out of memory exception. > > > > Thank you, > > > >Radai > > >
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Thanks for the proposal, Radai. +1 On Fri, Aug 5, 2016 at 8:38 AM, Ismael Juma wrote: > Hi Radai, > > Thanks for the proposal. I think it makes sense to be able to limit the > request queue by bytes. I haven't made up my mind on whether having both > limits is better than having a single one yet, but we probably need to make > a call on that before we can start a vote. > > Just a quick point with regards to ByteBoundedBlockingQueue. It's not > currently used in the code and the existing unit tests are very basic. > Also, looking at the implementation, it has two obvious bugs (it fails if > one passes None for `sizeFunction` and `currentByteSize` is not final and > it should be to comply with the JMM). So, we probably need to extend the > unit tests for that class if we want to use it (ideally there would be > multi-threaded tests too). It also is a bit inefficient as its adds > additional locks on top of the LinkedBlockingQueue ones, but the benchmarks > should hopefully show if that is something that needs to be addressed or > not. > > Ismael > > On Thu, Aug 4, 2016 at 7:28 PM, radai wrote: > > > Hello, > > > > I'd like to initiate a discussion about > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes > > > > The goal of the KIP is to allow configuring a bound on the capacity (as > in > > bytes of memory used) of the incoming request queue, in addition to the > > current bound on the number of messages. > > > > This comes after several incidents at Linkedin where a sudden "spike" of > > large message batches caused an out of memory exception. > > > > Thank you, > > > >Radai > > >
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
Hi Radai, Thanks for the proposal. I think it makes sense to be able to limit the request queue by bytes. I haven't made up my mind on whether having both limits is better than having a single one yet, but we probably need to make a call on that before we can start a vote. Just a quick point with regards to ByteBoundedBlockingQueue. It's not currently used in the code and the existing unit tests are very basic. Also, looking at the implementation, it has two obvious bugs (it fails if one passes None for `sizeFunction` and `currentByteSize` is not final and it should be to comply with the JMM). So, we probably need to extend the unit tests for that class if we want to use it (ideally there would be multi-threaded tests too). It also is a bit inefficient as its adds additional locks on top of the LinkedBlockingQueue ones, but the benchmarks should hopefully show if that is something that needs to be addressed or not. Ismael On Thu, Aug 4, 2016 at 7:28 PM, radai wrote: > Hello, > > I'd like to initiate a discussion about > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes > > The goal of the KIP is to allow configuring a bound on the capacity (as in > bytes of memory used) of the incoming request queue, in addition to the > current bound on the number of messages. > > This comes after several incidents at Linkedin where a sudden "spike" of > large message batches caused an out of memory exception. > > Thank you, > >Radai >
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
The proposal makes sense to me. I like that the plan to support both limits simultaneously: queued.max.requests is supported in addition queued.max.bytes (both > respected at the same time). In this case a default value of > queued.max.bytes = -1 would maintain backwards compatible behavior. Especially since ByteBoundedBlockingQueue already supports a limit on byte and element count. I actually don't mind the property naming of queued.max.*. I am not sure if the added clarity of requestQueue.max.* is worth the migration. On Fri, Aug 5, 2016 at 9:34 AM, Tom Crayford wrote: > This makes good sense to me, and seems to have very low amounts of downside > with large amounts of upside. +1 > > On Thursday, 4 August 2016, radai wrote: > > > Hello, > > > > I'd like to initiate a discussion about > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes > > > > The goal of the KIP is to allow configuring a bound on the capacity (as > in > > bytes of memory used) of the incoming request queue, in addition to the > > current bound on the number of messages. > > > > This comes after several incidents at Linkedin where a sudden "spike" of > > large message batches caused an out of memory exception. > > > > Thank you, > > > >Radai > > > -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes
This makes good sense to me, and seems to have very low amounts of downside with large amounts of upside. +1 On Thursday, 4 August 2016, radai wrote: > Hello, > > I'd like to initiate a discussion about > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes > > The goal of the KIP is to allow configuring a bound on the capacity (as in > bytes of memory used) of the incoming request queue, in addition to the > current bound on the number of messages. > > This comes after several incidents at Linkedin where a sudden "spike" of > large message batches caused an out of memory exception. > > Thank you, > >Radai >