Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-11-04 Thread radai
here are my proposed changes:

https://github.com/radai-rosenblatt/kafka/commit/8d7744ab8a6c660c4749b495b033b948a68efd3c

at this point i've run this code on a test cluster under load that OOMs
"vanilla" 0.10.1.0 and verified that my code deployed under the same
condition remains stable.

what i've done:

1. configure max heap size to 1.5GB and a single io thread (makes it easier
to DOS)
2. set up a topic with 100 partitions all on the same broker (makes it
easier to focus IO) - ./kafka-topics.sh --zookeeper  --create --topic
dos --replica-assignment [100 times the same broker id]
3. spin up load from 10 machines - ./kafka-producer-perf-test.sh --topic
dos --num-records 100 --record-size 991600 --throughput 10
--producer-props bootstrap.servers= max.request.size=104857600
acks=0 linger.ms=3 buffer.memory=209715200 batch.size=1048576

this would result in single requests that are just under 100MB in size,
times 10 for ~1GB max oustanding memory requirement. on my setup it was
enough to reliably DOS 10.0.1.0. under my patch the broker held up (request
rate was throttled).

performance when not under memory load was roughly the same (note the
longest run was ~1 hour, havent done long term stress tests yet).

At this point I think I've addressed most (all?) the concerns and would
like to move on to a vote? (obviously tha code has not been reviewed yet,
but in terms of high-level approach and changes to public API the KIP is
ready)




On Sun, Oct 30, 2016 at 5:05 PM, radai  wrote:

> Hi Jun,
>
> the benchmarks just spawn 16 threads where each thread allocates a chunk
> of memory from the pool and immediately releases it. 16 was chosen because
> its typical for LinkedIn setups. the benchmarks never "consume" more than
> 16 * [single allocation size] and so do not test out-of-memory performance,
> but rather "normal" operating conditions. tests were run with 4 memory
> allocation sizes - 1k, 10k, 100k and 1M (1M being the largest typical
> single request size setting at LinkedIn). the results are in ops/sec (for
> context - a single request involves a single allocation/release cycle,
> typical LinkedIn setups dont go beyond 20k requests/sec on a single broker).
>
> results show that the GC pool (which is a combination of an AtomicLong
> outstanding bytes count + weak references for allocated buffers) has a
> negligible performance cost vs the simple benchmark (which does nothing,
> same as current code).
>
> the more interesting thing that the results show is that as the requested
> buffer size gets larger a single allocate/release cycle becomes more
> expensive. since the benchmark never hold a lot of outstanding memory (16 *
> buf size tops) i suspect the issue is memory fragmentation - its harder to
> find larger contiguous chunks of heap.
>
> this indicates that for throughput scenarios (large request batches)
> broker performance may actually be impacted by the overhead of allocating
> and releasing buffers (the situation may even be worse - inter-broker
> requests are much larger), and an implementation of memory pool that
> actually recycles buffers (mine just acts as a limiter and leak detector)
> might improve broker performance under high throughput conditions (but
> thats probably a separate followup change).
>
> I expect to stress test my code this week (though no guarantees).
>
> I'll look at KIP-81.
>
> On Sun, Oct 30, 2016 at 12:27 PM, Jun Rao  wrote:
>
>> Hi, Radai,
>>
>> Sorry for the late response. How should the benchmark results be
>> interpreted? The higher the ops/s, the better? It would also be useful to
>> test this out on LinkedIn's traffic with enough socket connections to see
>> if there is any performance degradation.
>>
>> Also, there is a separate proposal KIP-81 to bound the consumer memory
>> usage. Perhaps you can chime it there on whether this proposal can be
>> utilized there too.
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+
>> Bound+Fetch+memory+usage+in+the+consumer
>>
>> Thanks,
>>
>> Jun
>>
>> On Tue, Sep 27, 2016 at 10:23 AM, radai 
>> wrote:
>>
>> > Hi Jun,
>> >
>> > 10 - mute/unmute functionality has been added in
>> > https://github.com/radai-rosenblatt/kafka/tree/broker-
>> > memory-pool-with-muting.
>> > I have yet to run stress tests to see how it behaves versus without
>> muting
>> >
>> > 11 - I've added a SimplePool implementation (nothing more than an
>> > AtomicLong really) and compared it with my GC pool (that uses weak
>> refs) -
>> > https://github.com/radai-rosenblatt/kafka-benchmarks/
>> > tree/master/memorypool-benchmarks.
>> > the results show no noticeable difference. what the results _do_ show
>> > though is that for large requests (1M) performance drops very sharply.
>> > since the SimplePool is essentially identical to current kafka code
>> > behaviour (the nechmark never reaches out of memory conditions) it would
>> > suggest to me that kafka performance for large request suffers greatly
>> from
>> > the cost of alloc

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-10-30 Thread radai
Hi Jun,

the benchmarks just spawn 16 threads where each thread allocates a chunk of
memory from the pool and immediately releases it. 16 was chosen because its
typical for LinkedIn setups. the benchmarks never "consume" more than 16 *
[single allocation size] and so do not test out-of-memory performance, but
rather "normal" operating conditions. tests were run with 4 memory
allocation sizes - 1k, 10k, 100k and 1M (1M being the largest typical
single request size setting at LinkedIn). the results are in ops/sec (for
context - a single request involves a single allocation/release cycle,
typical LinkedIn setups dont go beyond 20k requests/sec on a single broker).

results show that the GC pool (which is a combination of an AtomicLong
outstanding bytes count + weak references for allocated buffers) has a
negligible performance cost vs the simple benchmark (which does nothing,
same as current code).

the more interesting thing that the results show is that as the requested
buffer size gets larger a single allocate/release cycle becomes more
expensive. since the benchmark never hold a lot of outstanding memory (16 *
buf size tops) i suspect the issue is memory fragmentation - its harder to
find larger contiguous chunks of heap.

this indicates that for throughput scenarios (large request batches) broker
performance may actually be impacted by the overhead of allocating and
releasing buffers (the situation may even be worse - inter-broker requests
are much larger), and an implementation of memory pool that actually
recycles buffers (mine just acts as a limiter and leak detector) might
improve broker performance under high throughput conditions (but thats
probably a separate followup change).

I expect to stress test my code this week (though no guarantees).

I'll look at KIP-81.

On Sun, Oct 30, 2016 at 12:27 PM, Jun Rao  wrote:

> Hi, Radai,
>
> Sorry for the late response. How should the benchmark results be
> interpreted? The higher the ops/s, the better? It would also be useful to
> test this out on LinkedIn's traffic with enough socket connections to see
> if there is any performance degradation.
>
> Also, there is a separate proposal KIP-81 to bound the consumer memory
> usage. Perhaps you can chime it there on whether this proposal can be
> utilized there too.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>
> Thanks,
>
> Jun
>
> On Tue, Sep 27, 2016 at 10:23 AM, radai 
> wrote:
>
> > Hi Jun,
> >
> > 10 - mute/unmute functionality has been added in
> > https://github.com/radai-rosenblatt/kafka/tree/broker-
> > memory-pool-with-muting.
> > I have yet to run stress tests to see how it behaves versus without
> muting
> >
> > 11 - I've added a SimplePool implementation (nothing more than an
> > AtomicLong really) and compared it with my GC pool (that uses weak refs)
> -
> > https://github.com/radai-rosenblatt/kafka-benchmarks/
> > tree/master/memorypool-benchmarks.
> > the results show no noticeable difference. what the results _do_ show
> > though is that for large requests (1M) performance drops very sharply.
> > since the SimplePool is essentially identical to current kafka code
> > behaviour (the nechmark never reaches out of memory conditions) it would
> > suggest to me that kafka performance for large request suffers greatly
> from
> > the cost of allocating (and releasing) large buffers (instead of actually
> > pooling them for later re-use). this means that an implementation of
> memory
> > pool that actually pools ( :-) ) is very likely to improve broker
> > performance for large requests.
> >
> > 12 - if there was a single thread iterating over selection keys then
> > stopping at 1st unsatisfiable request might work (if iteration order over
> > selection keys is deterministic, which is OS-dependent). however, kafka
> > spawns multiple selectors sharing the same pool so i doubt the approach
> > would gain anything. also notice that the current code already shuffles
> the
> > selection keys if memory is low (<10%) to try and guarantee fairness.
> >
> > attached the benchmark results for the pool implementations:
> >
> > BenchmarkMode  Cnt
> > ScoreError  Units
> > GarbageCollectedMemoryPoolBenchmark.alloc_100k  thrpt5
> > 198272.519 ±  16045.965  ops/s
> > GarbageCollectedMemoryPoolBenchmark.alloc_10k   thrpt5
> > 2781439.307 ± 185287.072  ops/s
> > GarbageCollectedMemoryPoolBenchmark.alloc_1kthrpt5
> > 6029199.952 ± 465936.118  ops/s
> > GarbageCollectedMemoryPoolBenchmark.alloc_1mthrpt5
> > 18464.272 ±332.861  ops/s
> > SimpleMemoryPoolBenchmark.alloc_100kthrpt5
> > 204240.066 ±   2207.619  ops/s
> > SimpleMemoryPoolBenchmark.alloc_10k thrpt5
> > 3000794.525 ±  83510.836  ops/s
> > SimpleMemoryPoolBenchmark.alloc_1k  thrpt5
> > 5893671.778 ± 274239.541  ops/s
> > SimpleMemoryPoolBenchmark.alloc_1m  thrpt  

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-10-30 Thread Jun Rao
Hi, Radai,

Sorry for the late response. How should the benchmark results be
interpreted? The higher the ops/s, the better? It would also be useful to
test this out on LinkedIn's traffic with enough socket connections to see
if there is any performance degradation.

Also, there is a separate proposal KIP-81 to bound the consumer memory
usage. Perhaps you can chime it there on whether this proposal can be
utilized there too.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer

Thanks,

Jun

On Tue, Sep 27, 2016 at 10:23 AM, radai  wrote:

> Hi Jun,
>
> 10 - mute/unmute functionality has been added in
> https://github.com/radai-rosenblatt/kafka/tree/broker-
> memory-pool-with-muting.
> I have yet to run stress tests to see how it behaves versus without muting
>
> 11 - I've added a SimplePool implementation (nothing more than an
> AtomicLong really) and compared it with my GC pool (that uses weak refs) -
> https://github.com/radai-rosenblatt/kafka-benchmarks/
> tree/master/memorypool-benchmarks.
> the results show no noticeable difference. what the results _do_ show
> though is that for large requests (1M) performance drops very sharply.
> since the SimplePool is essentially identical to current kafka code
> behaviour (the nechmark never reaches out of memory conditions) it would
> suggest to me that kafka performance for large request suffers greatly from
> the cost of allocating (and releasing) large buffers (instead of actually
> pooling them for later re-use). this means that an implementation of memory
> pool that actually pools ( :-) ) is very likely to improve broker
> performance for large requests.
>
> 12 - if there was a single thread iterating over selection keys then
> stopping at 1st unsatisfiable request might work (if iteration order over
> selection keys is deterministic, which is OS-dependent). however, kafka
> spawns multiple selectors sharing the same pool so i doubt the approach
> would gain anything. also notice that the current code already shuffles the
> selection keys if memory is low (<10%) to try and guarantee fairness.
>
> attached the benchmark results for the pool implementations:
>
> BenchmarkMode  Cnt
> ScoreError  Units
> GarbageCollectedMemoryPoolBenchmark.alloc_100k  thrpt5
> 198272.519 ±  16045.965  ops/s
> GarbageCollectedMemoryPoolBenchmark.alloc_10k   thrpt5
> 2781439.307 ± 185287.072  ops/s
> GarbageCollectedMemoryPoolBenchmark.alloc_1kthrpt5
> 6029199.952 ± 465936.118  ops/s
> GarbageCollectedMemoryPoolBenchmark.alloc_1mthrpt5
> 18464.272 ±332.861  ops/s
> SimpleMemoryPoolBenchmark.alloc_100kthrpt5
> 204240.066 ±   2207.619  ops/s
> SimpleMemoryPoolBenchmark.alloc_10k thrpt5
> 3000794.525 ±  83510.836  ops/s
> SimpleMemoryPoolBenchmark.alloc_1k  thrpt5
> 5893671.778 ± 274239.541  ops/s
> SimpleMemoryPoolBenchmark.alloc_1m  thrpt5
> 18728.085 ±792.563  ops/s
>
>
>
> On Sat, Sep 24, 2016 at 9:07 AM, Jun Rao  wrote:
>
> > Hi, Radi,
> >
> > For 10, yes, we don't want the buffer pool to wake up the selector every
> > time some memory is freed up. We only want to do that when there is
> pending
> > requests to the buffer pool not honored due to not enough memory.
> >
> > For 11, we probably want to be a bit careful with Weak References. In
> > https://issues.apache.org/jira/browse/KAFKA-1989, we initially tried an
> > implementation based on Weak Reference, but abandoned it due to too much
> GC
> > overhead. It probably also makes the code a bit harder to understand. So,
> > perhaps it would be better if we can avoid it.
> >
> > For 12, that's a good point. I thought the selector will do some
> shuffling
> > for fairness. Perhaps we should stop allocating from the buffer pool when
> > we see the first key whose memory can't be honored?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Sat, Sep 24, 2016 at 8:44 AM, radai 
> wrote:
> >
> > > Hi Jun,
> > >
> > > 10 - I'll add this functionality to the mute/unmute branch. as every
> > > mute/unmute operation is O(#connections / #selectorThreads) maybe a
> > > watermark approach is better than waking when _any_ mem is available?
> > >
> > > 11 - "gc notifications" are done by using a ReferenceQueue (
> > > https://docs.oracle.com/javase/8/docs/api/java/lang/
> > > ref/ReferenceQueue.html)
> > > in combination with weak references to allocated buffers. when a buffer
> > is
> > > reclaimed by the GC the corresponding weak ref to it is enqueued. the
> > pool
> > > maintains a set of outstanding buffer IDs (every allocated buffer gets
> a
> > > unique id - basically a sequence). a buffer explicitly returned has its
> > id
> > > removed from the tracking set and the weak reference to it destroyed,
> so
> > > its reference will never be enqueued by the GC even if it is GC'ed
> later.
> > > an enqueued reference (which indicates a buffer not returned to 

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-27 Thread radai
Hi Jun,

10 - mute/unmute functionality has been added in
https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting.
I have yet to run stress tests to see how it behaves versus without muting

11 - I've added a SimplePool implementation (nothing more than an
AtomicLong really) and compared it with my GC pool (that uses weak refs) -
https://github.com/radai-rosenblatt/kafka-benchmarks/tree/master/memorypool-benchmarks.
the results show no noticeable difference. what the results _do_ show
though is that for large requests (1M) performance drops very sharply.
since the SimplePool is essentially identical to current kafka code
behaviour (the nechmark never reaches out of memory conditions) it would
suggest to me that kafka performance for large request suffers greatly from
the cost of allocating (and releasing) large buffers (instead of actually
pooling them for later re-use). this means that an implementation of memory
pool that actually pools ( :-) ) is very likely to improve broker
performance for large requests.

12 - if there was a single thread iterating over selection keys then
stopping at 1st unsatisfiable request might work (if iteration order over
selection keys is deterministic, which is OS-dependent). however, kafka
spawns multiple selectors sharing the same pool so i doubt the approach
would gain anything. also notice that the current code already shuffles the
selection keys if memory is low (<10%) to try and guarantee fairness.

attached the benchmark results for the pool implementations:

BenchmarkMode  Cnt
ScoreError  Units
GarbageCollectedMemoryPoolBenchmark.alloc_100k  thrpt5
198272.519 ±  16045.965  ops/s
GarbageCollectedMemoryPoolBenchmark.alloc_10k   thrpt5
2781439.307 ± 185287.072  ops/s
GarbageCollectedMemoryPoolBenchmark.alloc_1kthrpt5
6029199.952 ± 465936.118  ops/s
GarbageCollectedMemoryPoolBenchmark.alloc_1mthrpt5
18464.272 ±332.861  ops/s
SimpleMemoryPoolBenchmark.alloc_100kthrpt5
204240.066 ±   2207.619  ops/s
SimpleMemoryPoolBenchmark.alloc_10k thrpt5
3000794.525 ±  83510.836  ops/s
SimpleMemoryPoolBenchmark.alloc_1k  thrpt5
5893671.778 ± 274239.541  ops/s
SimpleMemoryPoolBenchmark.alloc_1m  thrpt5
18728.085 ±792.563  ops/s



On Sat, Sep 24, 2016 at 9:07 AM, Jun Rao  wrote:

> Hi, Radi,
>
> For 10, yes, we don't want the buffer pool to wake up the selector every
> time some memory is freed up. We only want to do that when there is pending
> requests to the buffer pool not honored due to not enough memory.
>
> For 11, we probably want to be a bit careful with Weak References. In
> https://issues.apache.org/jira/browse/KAFKA-1989, we initially tried an
> implementation based on Weak Reference, but abandoned it due to too much GC
> overhead. It probably also makes the code a bit harder to understand. So,
> perhaps it would be better if we can avoid it.
>
> For 12, that's a good point. I thought the selector will do some shuffling
> for fairness. Perhaps we should stop allocating from the buffer pool when
> we see the first key whose memory can't be honored?
>
> Thanks,
>
> Jun
>
>
> On Sat, Sep 24, 2016 at 8:44 AM, radai  wrote:
>
> > Hi Jun,
> >
> > 10 - I'll add this functionality to the mute/unmute branch. as every
> > mute/unmute operation is O(#connections / #selectorThreads) maybe a
> > watermark approach is better than waking when _any_ mem is available?
> >
> > 11 - "gc notifications" are done by using a ReferenceQueue (
> > https://docs.oracle.com/javase/8/docs/api/java/lang/
> > ref/ReferenceQueue.html)
> > in combination with weak references to allocated buffers. when a buffer
> is
> > reclaimed by the GC the corresponding weak ref to it is enqueued. the
> pool
> > maintains a set of outstanding buffer IDs (every allocated buffer gets a
> > unique id - basically a sequence). a buffer explicitly returned has its
> id
> > removed from the tracking set and the weak reference to it destroyed, so
> > its reference will never be enqueued by the GC even if it is GC'ed later.
> > an enqueued reference (which indicates a buffer not returned to pool)
> also
> > carries the buffer id, which is then removed from the outstanding buffers
> > set and the memory marked as available (and a warning printed). the pool
> > has a background thread dedicated to reading references out of the queue
> > (which under normal conditions remains blocked forever).
> >
> > 12 - the issue here is that a single "large" request (say 1MB) can get
> > blocked indefinitely under a high pressure of much smaller requests (say
> > 1KB) keeping memory utilization close to 100%. if we dont care about
> > potential starvation the change is in a single condition. i'll make this
> > configurable.
> >
> > 13 - I'll change the parameter name.
> >
> > On Fri, Sep 23, 2016 at 3:38 PM, Jun Rao  wrote:
> >
> > > Hi, Radai,
> > >
> > > Thanks for the updated KIP

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-24 Thread Jun Rao
Hi, Radi,

For 10, yes, we don't want the buffer pool to wake up the selector every
time some memory is freed up. We only want to do that when there is pending
requests to the buffer pool not honored due to not enough memory.

For 11, we probably want to be a bit careful with Weak References. In
https://issues.apache.org/jira/browse/KAFKA-1989, we initially tried an
implementation based on Weak Reference, but abandoned it due to too much GC
overhead. It probably also makes the code a bit harder to understand. So,
perhaps it would be better if we can avoid it.

For 12, that's a good point. I thought the selector will do some shuffling
for fairness. Perhaps we should stop allocating from the buffer pool when
we see the first key whose memory can't be honored?

Thanks,

Jun


On Sat, Sep 24, 2016 at 8:44 AM, radai  wrote:

> Hi Jun,
>
> 10 - I'll add this functionality to the mute/unmute branch. as every
> mute/unmute operation is O(#connections / #selectorThreads) maybe a
> watermark approach is better than waking when _any_ mem is available?
>
> 11 - "gc notifications" are done by using a ReferenceQueue (
> https://docs.oracle.com/javase/8/docs/api/java/lang/
> ref/ReferenceQueue.html)
> in combination with weak references to allocated buffers. when a buffer is
> reclaimed by the GC the corresponding weak ref to it is enqueued. the pool
> maintains a set of outstanding buffer IDs (every allocated buffer gets a
> unique id - basically a sequence). a buffer explicitly returned has its id
> removed from the tracking set and the weak reference to it destroyed, so
> its reference will never be enqueued by the GC even if it is GC'ed later.
> an enqueued reference (which indicates a buffer not returned to pool) also
> carries the buffer id, which is then removed from the outstanding buffers
> set and the memory marked as available (and a warning printed). the pool
> has a background thread dedicated to reading references out of the queue
> (which under normal conditions remains blocked forever).
>
> 12 - the issue here is that a single "large" request (say 1MB) can get
> blocked indefinitely under a high pressure of much smaller requests (say
> 1KB) keeping memory utilization close to 100%. if we dont care about
> potential starvation the change is in a single condition. i'll make this
> configurable.
>
> 13 - I'll change the parameter name.
>
> On Fri, Sep 23, 2016 at 3:38 PM, Jun Rao  wrote:
>
> > Hi, Radai,
> >
> > Thanks for the updated KIP. A few more questions/comments below.
> >
> > 10. For "the mute/unmute happens just before poll(), which means as a
> worst
> > case there will be no reads for 300ms if memory was unavailable", I am
> > thinking that memory-pool could track if there is any pending request and
> > wake up the selector when memory is released and there is a pending
> > request. This way, poll() doesn't have to wait for the timeout if memory
> > frees up early.
> >
> > 11. For "to facilitate faster implementation (as a safety net) the pool
> > will be implemented in such a way that memory that was not release()ed
> (but
> > still garbage collected) would be detected and "reclaimed". this is to
> > prevent "leaks" in case of code paths that fail to release() properly.",
> > could you explain a bit at the high level how this is done?
> >
> > 12. For "As the pool would allow any size request if it has any capacity
> > available, the actual memory bound is queued.max.bytes +
> > socket.request.max.bytes.", it seems intuitively, the pool should only
> give
> > the Buffer back if it has enough available bytes. Then the request memory
> > can be bounded by queued.max.bytes. We can validate that queued.max.bytes
> > is at least socket.request.max.bytes.
> >
> > 13. For the naming, it seems request.queue.max.bytes is clearer than
> > queue.max.bytes.
> >
> > Jun
> >
> >
> >
> > On Thu, Sep 22, 2016 at 10:53 AM, radai 
> > wrote:
> >
> > > As discussed in the KIP call, I have updated the kip-72 page (
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
> > > to record both configuration validations and implementation concerns.
> > > I've also implemented channel muting/unmuting in response to memory
> > > pressure. its available as a separate branch here -
> > > https://github.com/radai-rosenblatt/kafka/tree/broker-
> > > memory-pool-with-muting
> > > . the implementation without muting is here -
> > > https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool.
> > >
> > > the mute/unmute happens just before poll(), which means as a worst case
> > > there will be no reads for 300ms if memory was unavailable (thats the
> > > timeout untill the next poll). perhaps a design with dedicated read
> > threads
> > > could do better (such a thread could actually block waiting for
> memory),
> > > but that would be a giant change.
> > >
> > > On Tue, Sep 13, 2016 at 9:20 AM, radai 
> > wrote:
> > >
> > > > the specific memo

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-24 Thread radai
Hi Jun,

10 - I'll add this functionality to the mute/unmute branch. as every
mute/unmute operation is O(#connections / #selectorThreads) maybe a
watermark approach is better than waking when _any_ mem is available?

11 - "gc notifications" are done by using a ReferenceQueue (
https://docs.oracle.com/javase/8/docs/api/java/lang/ref/ReferenceQueue.html)
in combination with weak references to allocated buffers. when a buffer is
reclaimed by the GC the corresponding weak ref to it is enqueued. the pool
maintains a set of outstanding buffer IDs (every allocated buffer gets a
unique id - basically a sequence). a buffer explicitly returned has its id
removed from the tracking set and the weak reference to it destroyed, so
its reference will never be enqueued by the GC even if it is GC'ed later.
an enqueued reference (which indicates a buffer not returned to pool) also
carries the buffer id, which is then removed from the outstanding buffers
set and the memory marked as available (and a warning printed). the pool
has a background thread dedicated to reading references out of the queue
(which under normal conditions remains blocked forever).

12 - the issue here is that a single "large" request (say 1MB) can get
blocked indefinitely under a high pressure of much smaller requests (say
1KB) keeping memory utilization close to 100%. if we dont care about
potential starvation the change is in a single condition. i'll make this
configurable.

13 - I'll change the parameter name.

On Fri, Sep 23, 2016 at 3:38 PM, Jun Rao  wrote:

> Hi, Radai,
>
> Thanks for the updated KIP. A few more questions/comments below.
>
> 10. For "the mute/unmute happens just before poll(), which means as a worst
> case there will be no reads for 300ms if memory was unavailable", I am
> thinking that memory-pool could track if there is any pending request and
> wake up the selector when memory is released and there is a pending
> request. This way, poll() doesn't have to wait for the timeout if memory
> frees up early.
>
> 11. For "to facilitate faster implementation (as a safety net) the pool
> will be implemented in such a way that memory that was not release()ed (but
> still garbage collected) would be detected and "reclaimed". this is to
> prevent "leaks" in case of code paths that fail to release() properly.",
> could you explain a bit at the high level how this is done?
>
> 12. For "As the pool would allow any size request if it has any capacity
> available, the actual memory bound is queued.max.bytes +
> socket.request.max.bytes.", it seems intuitively, the pool should only give
> the Buffer back if it has enough available bytes. Then the request memory
> can be bounded by queued.max.bytes. We can validate that queued.max.bytes
> is at least socket.request.max.bytes.
>
> 13. For the naming, it seems request.queue.max.bytes is clearer than
> queue.max.bytes.
>
> Jun
>
>
>
> On Thu, Sep 22, 2016 at 10:53 AM, radai 
> wrote:
>
> > As discussed in the KIP call, I have updated the kip-72 page (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
> > to record both configuration validations and implementation concerns.
> > I've also implemented channel muting/unmuting in response to memory
> > pressure. its available as a separate branch here -
> > https://github.com/radai-rosenblatt/kafka/tree/broker-
> > memory-pool-with-muting
> > . the implementation without muting is here -
> > https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool.
> >
> > the mute/unmute happens just before poll(), which means as a worst case
> > there will be no reads for 300ms if memory was unavailable (thats the
> > timeout untill the next poll). perhaps a design with dedicated read
> threads
> > could do better (such a thread could actually block waiting for memory),
> > but that would be a giant change.
> >
> > On Tue, Sep 13, 2016 at 9:20 AM, radai 
> wrote:
> >
> > > the specific memory pool implementation i wrote will allocate _any_
> > amount
> > > you request if it has _any_ memory available (so if it has 1 byte
> > available
> > > and you ask for 1MB you will get 1MB and the counter will go negative).
> > > this was done to avoid issues with starvation of large requests. other
> > > implementations may be more strict. to me this means that generally its
> > not
> > > a simple "have memory" vs "no memory" split (which gets worse under a
> > > hypothetical tiered pool scheme for QoS).
> > >
> > > to allow this flexibility in pool implementation i must preserve the
> > > amount of memory required. once read from the channel i cant put it
> back,
> > > so i store it?
> > >
> > > On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram <
> > > rajinisiva...@googlemail.com> wrote:
> > >
> > >> Is there any value in allowing the 4-byte size to be read even when
> the
> > >> request memory limit has been reached? If not, you can disable OP_READ
> > >> interest for all channels that are 

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-23 Thread Jun Rao
Hi, Radai,

Thanks for the updated KIP. A few more questions/comments below.

10. For "the mute/unmute happens just before poll(), which means as a worst
case there will be no reads for 300ms if memory was unavailable", I am
thinking that memory-pool could track if there is any pending request and
wake up the selector when memory is released and there is a pending
request. This way, poll() doesn't have to wait for the timeout if memory
frees up early.

11. For "to facilitate faster implementation (as a safety net) the pool
will be implemented in such a way that memory that was not release()ed (but
still garbage collected) would be detected and "reclaimed". this is to
prevent "leaks" in case of code paths that fail to release() properly.",
could you explain a bit at the high level how this is done?

12. For "As the pool would allow any size request if it has any capacity
available, the actual memory bound is queued.max.bytes +
socket.request.max.bytes.", it seems intuitively, the pool should only give
the Buffer back if it has enough available bytes. Then the request memory
can be bounded by queued.max.bytes. We can validate that queued.max.bytes
is at least socket.request.max.bytes.

13. For the naming, it seems request.queue.max.bytes is clearer than
queue.max.bytes.

Jun



On Thu, Sep 22, 2016 at 10:53 AM, radai  wrote:

> As discussed in the KIP call, I have updated the kip-72 page (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
> to record both configuration validations and implementation concerns.
> I've also implemented channel muting/unmuting in response to memory
> pressure. its available as a separate branch here -
> https://github.com/radai-rosenblatt/kafka/tree/broker-
> memory-pool-with-muting
> . the implementation without muting is here -
> https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool.
>
> the mute/unmute happens just before poll(), which means as a worst case
> there will be no reads for 300ms if memory was unavailable (thats the
> timeout untill the next poll). perhaps a design with dedicated read threads
> could do better (such a thread could actually block waiting for memory),
> but that would be a giant change.
>
> On Tue, Sep 13, 2016 at 9:20 AM, radai  wrote:
>
> > the specific memory pool implementation i wrote will allocate _any_
> amount
> > you request if it has _any_ memory available (so if it has 1 byte
> available
> > and you ask for 1MB you will get 1MB and the counter will go negative).
> > this was done to avoid issues with starvation of large requests. other
> > implementations may be more strict. to me this means that generally its
> not
> > a simple "have memory" vs "no memory" split (which gets worse under a
> > hypothetical tiered pool scheme for QoS).
> >
> > to allow this flexibility in pool implementation i must preserve the
> > amount of memory required. once read from the channel i cant put it back,
> > so i store it?
> >
> > On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com> wrote:
> >
> >> Is there any value in allowing the 4-byte size to be read even when the
> >> request memory limit has been reached? If not, you can disable OP_READ
> >> interest for all channels that are ready inside Selector.poll() when
> >> memory
> >> limit has been reached and re-enable before returning from poll().
> Perhaps
> >> a listener that is invoked when MemoryPool moves from unavailable to
> >> available state can wakeup the selector. The changes for this should be
> >> fairly contained without any additional channel state. And it would
> avoid
> >> the overhead of polls that return immediately even when progress cannot
> be
> >> made because memory limit has been reached.
> >>
> >> On Tue, Sep 13, 2016 at 12:31 AM, radai 
> >> wrote:
> >>
> >> > Hi Jun,
> >> >
> >> > Yes, youre right - right now the next select() call will return
> >> immediately
> >> > with the same set of keys as earlier (at least) as they were not
> >> previously
> >> > handled (no memory).
> >> > My assumption is that this happens under considerable load - something
> >> has
> >> > to be occupying all this memory. also, this happens in the context of
> >> > SocketServer.Processor.run():
> >> >
> >> > while (isRunning) {
> >> >configureNewConnections()
> >> >processNewResponses()
> >> >poll()   <-- HERE
> >> >processCompletedReceives()
> >> >processCompletedSends()
> >> >processDisconnected()
> >> > }
> >> >
> >> > even within poll(), things like finishConnection(), prepare(), and
> >> write()s
> >> > can still make progress under low memory conditions. and given the
> load,
> >> > there's probably progress to be made in processCompletedReceives(),
> >> > processCompletedSends() and processDisconnected().
> >> >
> >> > if there's progress to be made in other things its likely that the
> next
> >> > call to poll() will not happen immediately and so t

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-22 Thread radai
As discussed in the KIP call, I have updated the kip-72 page (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
to record both configuration validations and implementation concerns.
I've also implemented channel muting/unmuting in response to memory
pressure. its available as a separate branch here -
https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting
. the implementation without muting is here -
https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool.

the mute/unmute happens just before poll(), which means as a worst case
there will be no reads for 300ms if memory was unavailable (thats the
timeout untill the next poll). perhaps a design with dedicated read threads
could do better (such a thread could actually block waiting for memory),
but that would be a giant change.

On Tue, Sep 13, 2016 at 9:20 AM, radai  wrote:

> the specific memory pool implementation i wrote will allocate _any_ amount
> you request if it has _any_ memory available (so if it has 1 byte available
> and you ask for 1MB you will get 1MB and the counter will go negative).
> this was done to avoid issues with starvation of large requests. other
> implementations may be more strict. to me this means that generally its not
> a simple "have memory" vs "no memory" split (which gets worse under a
> hypothetical tiered pool scheme for QoS).
>
> to allow this flexibility in pool implementation i must preserve the
> amount of memory required. once read from the channel i cant put it back,
> so i store it?
>
> On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
>> Is there any value in allowing the 4-byte size to be read even when the
>> request memory limit has been reached? If not, you can disable OP_READ
>> interest for all channels that are ready inside Selector.poll() when
>> memory
>> limit has been reached and re-enable before returning from poll(). Perhaps
>> a listener that is invoked when MemoryPool moves from unavailable to
>> available state can wakeup the selector. The changes for this should be
>> fairly contained without any additional channel state. And it would avoid
>> the overhead of polls that return immediately even when progress cannot be
>> made because memory limit has been reached.
>>
>> On Tue, Sep 13, 2016 at 12:31 AM, radai 
>> wrote:
>>
>> > Hi Jun,
>> >
>> > Yes, youre right - right now the next select() call will return
>> immediately
>> > with the same set of keys as earlier (at least) as they were not
>> previously
>> > handled (no memory).
>> > My assumption is that this happens under considerable load - something
>> has
>> > to be occupying all this memory. also, this happens in the context of
>> > SocketServer.Processor.run():
>> >
>> > while (isRunning) {
>> >configureNewConnections()
>> >processNewResponses()
>> >poll()   <-- HERE
>> >processCompletedReceives()
>> >processCompletedSends()
>> >processDisconnected()
>> > }
>> >
>> > even within poll(), things like finishConnection(), prepare(), and
>> write()s
>> > can still make progress under low memory conditions. and given the load,
>> > there's probably progress to be made in processCompletedReceives(),
>> > processCompletedSends() and processDisconnected().
>> >
>> > if there's progress to be made in other things its likely that the next
>> > call to poll() will not happen immediately and so the loop wont be that
>> > tight. in order for this to devolve into true busy waiting you would
>> need a
>> > situation where no progress can be made on any in-progress requests and
>> no
>> > responses to send out ?
>> >
>> > if my assumption does not hold then you are correct, and
>> selector.poll(300)
>> > currently hardcoded in SocketServer.Processor.poll() would need to be
>> > replaced with something more complicated. my biggest point of concern
>> > though is that the resulting code would be complicated and would couple
>> > Selector to the memory pool very tightly. undey my current patch
>> Selector
>> > needs the memory pool only to pass to channels when they are built. this
>> > would allow different memory pools relatively easily for things like
>> > reserving memory for cross-broker replication and high-SLA connections.
>> a
>> > tighter coupling would make any such future modification hard.
>> >
>> > On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao  wrote:
>> >
>> > > Hi, Radai,
>> > >
>> > > Thanks for the reply. I still have a followup question on #2.
>> > >
>> > > My understanding is that in your proposal, selector will now first
>> read
>> > the
>> > > size of the Receive. If there is not enough memory, it has to turn off
>> > the
>> > > READ interest bit for the corresponding KafkaChannel. Otherwise,
>> > subsequent
>> > > selector.poll() call will always return immediately, adding
>> unnecessary
>> > > overhead. If you do that, the  Selector will need to know when to
>> turn on

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-13 Thread radai
the specific memory pool implementation i wrote will allocate _any_ amount
you request if it has _any_ memory available (so if it has 1 byte available
and you ask for 1MB you will get 1MB and the counter will go negative).
this was done to avoid issues with starvation of large requests. other
implementations may be more strict. to me this means that generally its not
a simple "have memory" vs "no memory" split (which gets worse under a
hypothetical tiered pool scheme for QoS).

to allow this flexibility in pool implementation i must preserve the amount
of memory required. once read from the channel i cant put it back, so i
store it?

On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Is there any value in allowing the 4-byte size to be read even when the
> request memory limit has been reached? If not, you can disable OP_READ
> interest for all channels that are ready inside Selector.poll() when memory
> limit has been reached and re-enable before returning from poll(). Perhaps
> a listener that is invoked when MemoryPool moves from unavailable to
> available state can wakeup the selector. The changes for this should be
> fairly contained without any additional channel state. And it would avoid
> the overhead of polls that return immediately even when progress cannot be
> made because memory limit has been reached.
>
> On Tue, Sep 13, 2016 at 12:31 AM, radai 
> wrote:
>
> > Hi Jun,
> >
> > Yes, youre right - right now the next select() call will return
> immediately
> > with the same set of keys as earlier (at least) as they were not
> previously
> > handled (no memory).
> > My assumption is that this happens under considerable load - something
> has
> > to be occupying all this memory. also, this happens in the context of
> > SocketServer.Processor.run():
> >
> > while (isRunning) {
> >configureNewConnections()
> >processNewResponses()
> >poll()   <-- HERE
> >processCompletedReceives()
> >processCompletedSends()
> >processDisconnected()
> > }
> >
> > even within poll(), things like finishConnection(), prepare(), and
> write()s
> > can still make progress under low memory conditions. and given the load,
> > there's probably progress to be made in processCompletedReceives(),
> > processCompletedSends() and processDisconnected().
> >
> > if there's progress to be made in other things its likely that the next
> > call to poll() will not happen immediately and so the loop wont be that
> > tight. in order for this to devolve into true busy waiting you would
> need a
> > situation where no progress can be made on any in-progress requests and
> no
> > responses to send out ?
> >
> > if my assumption does not hold then you are correct, and
> selector.poll(300)
> > currently hardcoded in SocketServer.Processor.poll() would need to be
> > replaced with something more complicated. my biggest point of concern
> > though is that the resulting code would be complicated and would couple
> > Selector to the memory pool very tightly. undey my current patch Selector
> > needs the memory pool only to pass to channels when they are built. this
> > would allow different memory pools relatively easily for things like
> > reserving memory for cross-broker replication and high-SLA connections. a
> > tighter coupling would make any such future modification hard.
> >
> > On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao  wrote:
> >
> > > Hi, Radai,
> > >
> > > Thanks for the reply. I still have a followup question on #2.
> > >
> > > My understanding is that in your proposal, selector will now first read
> > the
> > > size of the Receive. If there is not enough memory, it has to turn off
> > the
> > > READ interest bit for the corresponding KafkaChannel. Otherwise,
> > subsequent
> > > selector.poll() call will always return immediately, adding unnecessary
> > > overhead. If you do that, the  Selector will need to know when to turn
> on
> > > the READ interest bit again. It may not be enough to do this check
> until
> > > the next poll call since the timeout used by poll() could be
> arbitrarily
> > > large. So, it seems that some kind of coordination between the Selector
> > and
> > > the bufferpool is needed?
> > >
> > > Jun
> > >
> > > On Thu, Sep 8, 2016 at 7:02 PM, radai 
> > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > 1. yes, it is my own personal opinion that people use
> > queued.max.requests
> > > > as an indirect way to bound memory consumption. once a more direct
> > memory
> > > > bound mechanism exists (and works) i dont think queued.max.requests
> > woul
> > > > dbe required. having said that I was not planning on making any
> changes
> > > > w.r.t queued.max.requests support (so I was aiming to get to a
> > situation
> > > > where both configs are supported) to allow gathering enough
> > > data/feedback.
> > > >
> > > > 2. Selector.poll() calls into KafkaChannel.read() to maybe get a
> > > > NetworkReceive. multiple such read() calls may be required until a
>

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-13 Thread Rajini Sivaram
Is there any value in allowing the 4-byte size to be read even when the
request memory limit has been reached? If not, you can disable OP_READ
interest for all channels that are ready inside Selector.poll() when memory
limit has been reached and re-enable before returning from poll(). Perhaps
a listener that is invoked when MemoryPool moves from unavailable to
available state can wakeup the selector. The changes for this should be
fairly contained without any additional channel state. And it would avoid
the overhead of polls that return immediately even when progress cannot be
made because memory limit has been reached.

On Tue, Sep 13, 2016 at 12:31 AM, radai  wrote:

> Hi Jun,
>
> Yes, youre right - right now the next select() call will return immediately
> with the same set of keys as earlier (at least) as they were not previously
> handled (no memory).
> My assumption is that this happens under considerable load - something has
> to be occupying all this memory. also, this happens in the context of
> SocketServer.Processor.run():
>
> while (isRunning) {
>configureNewConnections()
>processNewResponses()
>poll()   <-- HERE
>processCompletedReceives()
>processCompletedSends()
>processDisconnected()
> }
>
> even within poll(), things like finishConnection(), prepare(), and write()s
> can still make progress under low memory conditions. and given the load,
> there's probably progress to be made in processCompletedReceives(),
> processCompletedSends() and processDisconnected().
>
> if there's progress to be made in other things its likely that the next
> call to poll() will not happen immediately and so the loop wont be that
> tight. in order for this to devolve into true busy waiting you would need a
> situation where no progress can be made on any in-progress requests and no
> responses to send out ?
>
> if my assumption does not hold then you are correct, and selector.poll(300)
> currently hardcoded in SocketServer.Processor.poll() would need to be
> replaced with something more complicated. my biggest point of concern
> though is that the resulting code would be complicated and would couple
> Selector to the memory pool very tightly. undey my current patch Selector
> needs the memory pool only to pass to channels when they are built. this
> would allow different memory pools relatively easily for things like
> reserving memory for cross-broker replication and high-SLA connections. a
> tighter coupling would make any such future modification hard.
>
> On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao  wrote:
>
> > Hi, Radai,
> >
> > Thanks for the reply. I still have a followup question on #2.
> >
> > My understanding is that in your proposal, selector will now first read
> the
> > size of the Receive. If there is not enough memory, it has to turn off
> the
> > READ interest bit for the corresponding KafkaChannel. Otherwise,
> subsequent
> > selector.poll() call will always return immediately, adding unnecessary
> > overhead. If you do that, the  Selector will need to know when to turn on
> > the READ interest bit again. It may not be enough to do this check until
> > the next poll call since the timeout used by poll() could be arbitrarily
> > large. So, it seems that some kind of coordination between the Selector
> and
> > the bufferpool is needed?
> >
> > Jun
> >
> > On Thu, Sep 8, 2016 at 7:02 PM, radai 
> wrote:
> >
> > > Hi Jun,
> > >
> > > 1. yes, it is my own personal opinion that people use
> queued.max.requests
> > > as an indirect way to bound memory consumption. once a more direct
> memory
> > > bound mechanism exists (and works) i dont think queued.max.requests
> woul
> > > dbe required. having said that I was not planning on making any changes
> > > w.r.t queued.max.requests support (so I was aiming to get to a
> situation
> > > where both configs are supported) to allow gathering enough
> > data/feedback.
> > >
> > > 2. Selector.poll() calls into KafkaChannel.read() to maybe get a
> > > NetworkReceive. multiple such read() calls may be required until a
> > Receive
> > > is produced already in the current code base. my pool implementation is
> > > non-blocking so if there's no memory available the read() call will
> > return
> > > null. poll() would then move on to try and service other selection
> keys.
> > > the pool will be checked for available memory again the next time the
> > > SocketServer.run() loop gets to poll(). and so right now I dont
> > communicate
> > > memory becoming available to the selector - it will just go on to try
> and
> > > make progress elsewhere and come back again. i never block it or send
> it
> > to
> > > sleep. I think for efficiency what could maybe be done is if there's
> not
> > > enough memory to service a readable selection key we may want to skip
> all
> > > other read-ready selection keys for that iteration of
> > pollSelectionKeys().
> > > that would require rather invasive changes around
> > > Selector.pollSelectionKeys() that I'd r

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-12 Thread radai
Hi Jun,

Yes, youre right - right now the next select() call will return immediately
with the same set of keys as earlier (at least) as they were not previously
handled (no memory).
My assumption is that this happens under considerable load - something has
to be occupying all this memory. also, this happens in the context of
SocketServer.Processor.run():

while (isRunning) {
   configureNewConnections()
   processNewResponses()
   poll()   <-- HERE
   processCompletedReceives()
   processCompletedSends()
   processDisconnected()
}

even within poll(), things like finishConnection(), prepare(), and write()s
can still make progress under low memory conditions. and given the load,
there's probably progress to be made in processCompletedReceives(),
processCompletedSends() and processDisconnected().

if there's progress to be made in other things its likely that the next
call to poll() will not happen immediately and so the loop wont be that
tight. in order for this to devolve into true busy waiting you would need a
situation where no progress can be made on any in-progress requests and no
responses to send out ?

if my assumption does not hold then you are correct, and selector.poll(300)
currently hardcoded in SocketServer.Processor.poll() would need to be
replaced with something more complicated. my biggest point of concern
though is that the resulting code would be complicated and would couple
Selector to the memory pool very tightly. undey my current patch Selector
needs the memory pool only to pass to channels when they are built. this
would allow different memory pools relatively easily for things like
reserving memory for cross-broker replication and high-SLA connections. a
tighter coupling would make any such future modification hard.

On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao  wrote:

> Hi, Radai,
>
> Thanks for the reply. I still have a followup question on #2.
>
> My understanding is that in your proposal, selector will now first read the
> size of the Receive. If there is not enough memory, it has to turn off the
> READ interest bit for the corresponding KafkaChannel. Otherwise, subsequent
> selector.poll() call will always return immediately, adding unnecessary
> overhead. If you do that, the  Selector will need to know when to turn on
> the READ interest bit again. It may not be enough to do this check until
> the next poll call since the timeout used by poll() could be arbitrarily
> large. So, it seems that some kind of coordination between the Selector and
> the bufferpool is needed?
>
> Jun
>
> On Thu, Sep 8, 2016 at 7:02 PM, radai  wrote:
>
> > Hi Jun,
> >
> > 1. yes, it is my own personal opinion that people use queued.max.requests
> > as an indirect way to bound memory consumption. once a more direct memory
> > bound mechanism exists (and works) i dont think queued.max.requests woul
> > dbe required. having said that I was not planning on making any changes
> > w.r.t queued.max.requests support (so I was aiming to get to a situation
> > where both configs are supported) to allow gathering enough
> data/feedback.
> >
> > 2. Selector.poll() calls into KafkaChannel.read() to maybe get a
> > NetworkReceive. multiple such read() calls may be required until a
> Receive
> > is produced already in the current code base. my pool implementation is
> > non-blocking so if there's no memory available the read() call will
> return
> > null. poll() would then move on to try and service other selection keys.
> > the pool will be checked for available memory again the next time the
> > SocketServer.run() loop gets to poll(). and so right now I dont
> communicate
> > memory becoming available to the selector - it will just go on to try and
> > make progress elsewhere and come back again. i never block it or send it
> to
> > sleep. I think for efficiency what could maybe be done is if there's not
> > enough memory to service a readable selection key we may want to skip all
> > other read-ready selection keys for that iteration of
> pollSelectionKeys().
> > that would require rather invasive changes around
> > Selector.pollSelectionKeys() that I'd rather avoid. also different
> > KafkaChannels may be backed by different memory pool (under some sort of
> > future QoS scheme?), which would complicate such an optimization further.
> >
> > 3. i added the pool interface and implementation under
> kafka.common.memory,
> > and the API is "thin" enough to be generally useful (currently its
> > non-blocking only, but a get(long maxWait) is definitely doable). having
> > said that, I'm not really familiar enough with the code to say
> >
> >
> >
> > On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao  wrote:
> >
> > > Hi, Radi,
> > >
> > > Thanks for the update. At the high level, this looks promising. A few
> > > comments below.
> > >
> > > 1. If we can bound the requests by bytes, it seems that we don't need
> > > queued.max.requests
> > > any more? Could we just deprecate the config and make the queue size
> > > unbounded?

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-11 Thread Jun Rao
Hi, Radai,

Thanks for the reply. I still have a followup question on #2.

My understanding is that in your proposal, selector will now first read the
size of the Receive. If there is not enough memory, it has to turn off the
READ interest bit for the corresponding KafkaChannel. Otherwise, subsequent
selector.poll() call will always return immediately, adding unnecessary
overhead. If you do that, the  Selector will need to know when to turn on
the READ interest bit again. It may not be enough to do this check until
the next poll call since the timeout used by poll() could be arbitrarily
large. So, it seems that some kind of coordination between the Selector and
the bufferpool is needed?

Jun

On Thu, Sep 8, 2016 at 7:02 PM, radai  wrote:

> Hi Jun,
>
> 1. yes, it is my own personal opinion that people use queued.max.requests
> as an indirect way to bound memory consumption. once a more direct memory
> bound mechanism exists (and works) i dont think queued.max.requests woul
> dbe required. having said that I was not planning on making any changes
> w.r.t queued.max.requests support (so I was aiming to get to a situation
> where both configs are supported) to allow gathering enough data/feedback.
>
> 2. Selector.poll() calls into KafkaChannel.read() to maybe get a
> NetworkReceive. multiple such read() calls may be required until a Receive
> is produced already in the current code base. my pool implementation is
> non-blocking so if there's no memory available the read() call will return
> null. poll() would then move on to try and service other selection keys.
> the pool will be checked for available memory again the next time the
> SocketServer.run() loop gets to poll(). and so right now I dont communicate
> memory becoming available to the selector - it will just go on to try and
> make progress elsewhere and come back again. i never block it or send it to
> sleep. I think for efficiency what could maybe be done is if there's not
> enough memory to service a readable selection key we may want to skip all
> other read-ready selection keys for that iteration of pollSelectionKeys().
> that would require rather invasive changes around
> Selector.pollSelectionKeys() that I'd rather avoid. also different
> KafkaChannels may be backed by different memory pool (under some sort of
> future QoS scheme?), which would complicate such an optimization further.
>
> 3. i added the pool interface and implementation under kafka.common.memory,
> and the API is "thin" enough to be generally useful (currently its
> non-blocking only, but a get(long maxWait) is definitely doable). having
> said that, I'm not really familiar enough with the code to say
>
>
>
> On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao  wrote:
>
> > Hi, Radi,
> >
> > Thanks for the update. At the high level, this looks promising. A few
> > comments below.
> >
> > 1. If we can bound the requests by bytes, it seems that we don't need
> > queued.max.requests
> > any more? Could we just deprecate the config and make the queue size
> > unbounded?
> > 2. How do we communicate back to the selector when some memory is freed
> up?
> > We probably need to wake up the selector. For efficiency, perhaps we only
> > need to wake up the selector if the bufferpool is full?
> > 3. We talked about bounding the consumer's memory before. To fully
> support
> > that, we will need to bound the memory used by different fetch responses
> in
> > the consumer. Do you think the changes that you propose here can be
> > leveraged to bound the memory in the consumer as well?
> >
> > Jun
> >
> >
> > On Tue, Aug 30, 2016 at 10:41 AM, radai 
> > wrote:
> >
> > > My apologies for the delay in response.
> > >
> > > I agree with the concerns about OOM reading from the actual sockets and
> > > blocking the network threads - messing with the request queue itself
> > would
> > > not do.
> > >
> > > I propose instead a memory pool approach - the broker would have a non
> > > blocking memory pool. upon reading the first 4 bytes out of a socket an
> > > attempt would be made to acquire enough memory and if that attempt
> fails
> > > the processing thread will move on to try and make progress with other
> > > tasks.
> > >
> > > I think Its simpler than mute/unmute because using mute/unmute would
> > > require differentiating between sockets muted due to a request in
> > progress
> > > (normal current operation) and sockets muted due to lack of memory.
> > sockets
> > > of the 1st kind would be unmuted at the end of request processing (as
> it
> > > happens right now) but the 2nd kind would require some sort of "unmute
> > > watchdog" which is (i claim) more complicated than a memory pool. also
> a
> > > memory pool is a more generic solution.
> > >
> > > I've updated the KIP page (
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
> > > to reflect the new proposed implementation, and i've also put up an
> > inital
> >

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-08 Thread radai
Hi Jun,

1. yes, it is my own personal opinion that people use queued.max.requests
as an indirect way to bound memory consumption. once a more direct memory
bound mechanism exists (and works) i dont think queued.max.requests woul
dbe required. having said that I was not planning on making any changes
w.r.t queued.max.requests support (so I was aiming to get to a situation
where both configs are supported) to allow gathering enough data/feedback.

2. Selector.poll() calls into KafkaChannel.read() to maybe get a
NetworkReceive. multiple such read() calls may be required until a Receive
is produced already in the current code base. my pool implementation is
non-blocking so if there's no memory available the read() call will return
null. poll() would then move on to try and service other selection keys.
the pool will be checked for available memory again the next time the
SocketServer.run() loop gets to poll(). and so right now I dont communicate
memory becoming available to the selector - it will just go on to try and
make progress elsewhere and come back again. i never block it or send it to
sleep. I think for efficiency what could maybe be done is if there's not
enough memory to service a readable selection key we may want to skip all
other read-ready selection keys for that iteration of pollSelectionKeys().
that would require rather invasive changes around
Selector.pollSelectionKeys() that I'd rather avoid. also different
KafkaChannels may be backed by different memory pool (under some sort of
future QoS scheme?), which would complicate such an optimization further.

3. i added the pool interface and implementation under kafka.common.memory,
and the API is "thin" enough to be generally useful (currently its
non-blocking only, but a get(long maxWait) is definitely doable). having
said that, I'm not really familiar enough with the code to say



On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao  wrote:

> Hi, Radi,
>
> Thanks for the update. At the high level, this looks promising. A few
> comments below.
>
> 1. If we can bound the requests by bytes, it seems that we don't need
> queued.max.requests
> any more? Could we just deprecate the config and make the queue size
> unbounded?
> 2. How do we communicate back to the selector when some memory is freed up?
> We probably need to wake up the selector. For efficiency, perhaps we only
> need to wake up the selector if the bufferpool is full?
> 3. We talked about bounding the consumer's memory before. To fully support
> that, we will need to bound the memory used by different fetch responses in
> the consumer. Do you think the changes that you propose here can be
> leveraged to bound the memory in the consumer as well?
>
> Jun
>
>
> On Tue, Aug 30, 2016 at 10:41 AM, radai 
> wrote:
>
> > My apologies for the delay in response.
> >
> > I agree with the concerns about OOM reading from the actual sockets and
> > blocking the network threads - messing with the request queue itself
> would
> > not do.
> >
> > I propose instead a memory pool approach - the broker would have a non
> > blocking memory pool. upon reading the first 4 bytes out of a socket an
> > attempt would be made to acquire enough memory and if that attempt fails
> > the processing thread will move on to try and make progress with other
> > tasks.
> >
> > I think Its simpler than mute/unmute because using mute/unmute would
> > require differentiating between sockets muted due to a request in
> progress
> > (normal current operation) and sockets muted due to lack of memory.
> sockets
> > of the 1st kind would be unmuted at the end of request processing (as it
> > happens right now) but the 2nd kind would require some sort of "unmute
> > watchdog" which is (i claim) more complicated than a memory pool. also a
> > memory pool is a more generic solution.
> >
> > I've updated the KIP page (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
> > to reflect the new proposed implementation, and i've also put up an
> inital
> > implementation proposal on github -
> > https://github.com/radai-rosenblatt/kafka/commits/broker-memory-pool.
> the
> > proposed code is not complete and tested yet (so probably buggy) but does
> > include the main points of modification.
> >
> > the specific implementation of the pool on that branch also has a built
> in
> > safety net where memory that is acquired but not released (which is a
> bug)
> > is discovered when the garbage collector frees it and the capacity is
> > reclaimed.
> >
> > On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao  wrote:
> >
> > > Radi,
> > >
> > > Yes, I got the benefit of bounding the request queue by bytes. My
> concern
> > > is the following if we don't change the behavior of processor blocking
> on
> > > queue full.
> > >
> > > If the broker truly doesn't have enough memory for buffering
> outstanding
> > > requests from all connections, we have to either hit OOM or block the
> > > pr

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-02 Thread Jun Rao
Hi, Radi,

Thanks for the update. At the high level, this looks promising. A few
comments below.

1. If we can bound the requests by bytes, it seems that we don't need
queued.max.requests
any more? Could we just deprecate the config and make the queue size
unbounded?
2. How do we communicate back to the selector when some memory is freed up?
We probably need to wake up the selector. For efficiency, perhaps we only
need to wake up the selector if the bufferpool is full?
3. We talked about bounding the consumer's memory before. To fully support
that, we will need to bound the memory used by different fetch responses in
the consumer. Do you think the changes that you propose here can be
leveraged to bound the memory in the consumer as well?

Jun


On Tue, Aug 30, 2016 at 10:41 AM, radai  wrote:

> My apologies for the delay in response.
>
> I agree with the concerns about OOM reading from the actual sockets and
> blocking the network threads - messing with the request queue itself would
> not do.
>
> I propose instead a memory pool approach - the broker would have a non
> blocking memory pool. upon reading the first 4 bytes out of a socket an
> attempt would be made to acquire enough memory and if that attempt fails
> the processing thread will move on to try and make progress with other
> tasks.
>
> I think Its simpler than mute/unmute because using mute/unmute would
> require differentiating between sockets muted due to a request in progress
> (normal current operation) and sockets muted due to lack of memory. sockets
> of the 1st kind would be unmuted at the end of request processing (as it
> happens right now) but the 2nd kind would require some sort of "unmute
> watchdog" which is (i claim) more complicated than a memory pool. also a
> memory pool is a more generic solution.
>
> I've updated the KIP page (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
> to reflect the new proposed implementation, and i've also put up an inital
> implementation proposal on github -
> https://github.com/radai-rosenblatt/kafka/commits/broker-memory-pool. the
> proposed code is not complete and tested yet (so probably buggy) but does
> include the main points of modification.
>
> the specific implementation of the pool on that branch also has a built in
> safety net where memory that is acquired but not released (which is a bug)
> is discovered when the garbage collector frees it and the capacity is
> reclaimed.
>
> On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao  wrote:
>
> > Radi,
> >
> > Yes, I got the benefit of bounding the request queue by bytes. My concern
> > is the following if we don't change the behavior of processor blocking on
> > queue full.
> >
> > If the broker truly doesn't have enough memory for buffering outstanding
> > requests from all connections, we have to either hit OOM or block the
> > processor. Both will be bad. I am not sure if one is clearly better than
> > the other. In this case, the solution is probably to expand the cluster
> to
> > reduce the per broker request load.
> >
> > If the broker actually has enough memory, we want to be able to configure
> > the request queue in such a way that it never blocks. You can tell people
> > to just set the request queue to be unbounded, which may scare them. If
> we
> > do want to put a bound, it seems it's easier to configure the queue size
> > based on # requests. Basically, we can tell people to set the queue size
> > based on number of connections. If the queue is based on bytes, it's not
> > clear how people should set it w/o causing the processor to block.
> >
> > Finally, Rajini has a good point. The ByteBuffer in the request object is
> > allocated as soon as we see the first 4 bytes from the socket. So, I am
> not
> > sure if just bounding the request queue itself is enough to bound the
> > memory related to requests.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Mon, Aug 8, 2016 at 4:46 PM, radai 
> wrote:
> >
> > > I agree that filling up the request queue can cause clients to time out
> > > (and presumably retry?). However, for the workloads where we expect
> this
> > > configuration to be useful the alternative is currently an OOM crash.
> > > In my opinion an initial implementation of this feature could be
> > > constrained to a simple drop-in replacement of ArrayBlockingQueue
> > > (conditional, opt-in) and further study of behavior patterns under load
> > can
> > > drive future changes to the API later when those behaviors are better
> > > understood (like back-pressure, nop filler responses to avoid client
> > > timeouts or whatever).
> > >
> > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com>
> > > wrote:
> > >
> > > > Nice write up Radai.
> > > > I think what Jun said is a valid concern.
> > > > If I am not wrong as per the proposal, we are depending on the entire
> > > > pipeline to flow smoothly from accepting requests to h

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-30 Thread radai
My apologies for the delay in response.

I agree with the concerns about OOM reading from the actual sockets and
blocking the network threads - messing with the request queue itself would
not do.

I propose instead a memory pool approach - the broker would have a non
blocking memory pool. upon reading the first 4 bytes out of a socket an
attempt would be made to acquire enough memory and if that attempt fails
the processing thread will move on to try and make progress with other
tasks.

I think Its simpler than mute/unmute because using mute/unmute would
require differentiating between sockets muted due to a request in progress
(normal current operation) and sockets muted due to lack of memory. sockets
of the 1st kind would be unmuted at the end of request processing (as it
happens right now) but the 2nd kind would require some sort of "unmute
watchdog" which is (i claim) more complicated than a memory pool. also a
memory pool is a more generic solution.

I've updated the KIP page (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
to reflect the new proposed implementation, and i've also put up an inital
implementation proposal on github -
https://github.com/radai-rosenblatt/kafka/commits/broker-memory-pool. the
proposed code is not complete and tested yet (so probably buggy) but does
include the main points of modification.

the specific implementation of the pool on that branch also has a built in
safety net where memory that is acquired but not released (which is a bug)
is discovered when the garbage collector frees it and the capacity is
reclaimed.

On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao  wrote:

> Radi,
>
> Yes, I got the benefit of bounding the request queue by bytes. My concern
> is the following if we don't change the behavior of processor blocking on
> queue full.
>
> If the broker truly doesn't have enough memory for buffering outstanding
> requests from all connections, we have to either hit OOM or block the
> processor. Both will be bad. I am not sure if one is clearly better than
> the other. In this case, the solution is probably to expand the cluster to
> reduce the per broker request load.
>
> If the broker actually has enough memory, we want to be able to configure
> the request queue in such a way that it never blocks. You can tell people
> to just set the request queue to be unbounded, which may scare them. If we
> do want to put a bound, it seems it's easier to configure the queue size
> based on # requests. Basically, we can tell people to set the queue size
> based on number of connections. If the queue is based on bytes, it's not
> clear how people should set it w/o causing the processor to block.
>
> Finally, Rajini has a good point. The ByteBuffer in the request object is
> allocated as soon as we see the first 4 bytes from the socket. So, I am not
> sure if just bounding the request queue itself is enough to bound the
> memory related to requests.
>
> Thanks,
>
> Jun
>
>
>
> On Mon, Aug 8, 2016 at 4:46 PM, radai  wrote:
>
> > I agree that filling up the request queue can cause clients to time out
> > (and presumably retry?). However, for the workloads where we expect this
> > configuration to be useful the alternative is currently an OOM crash.
> > In my opinion an initial implementation of this feature could be
> > constrained to a simple drop-in replacement of ArrayBlockingQueue
> > (conditional, opt-in) and further study of behavior patterns under load
> can
> > drive future changes to the API later when those behaviors are better
> > understood (like back-pressure, nop filler responses to avoid client
> > timeouts or whatever).
> >
> > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com>
> > wrote:
> >
> > > Nice write up Radai.
> > > I think what Jun said is a valid concern.
> > > If I am not wrong as per the proposal, we are depending on the entire
> > > pipeline to flow smoothly from accepting requests to handling it,
> calling
> > > KafkaApis and handing back the responses.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > >
> > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy 
> wrote:
> > >
> > > > >
> > > > > .
> > > > >>
> > > > >>
> > > > > Hi Becket,
> > > > >
> > > > > I don't think progress can be made in the processor's run loop if
> the
> > > > > queue fills up. i.e., I think Jun's point is that if the queue is
> > full
> > > > > (either due to the proposed max.bytes or today due to max.requests
> > > > hitting
> > > > > the limit) then processCompletedReceives will block and no further
> > > > progress
> > > > > can be made.
> > > > >
> > > >
> > > > I'm sorry - this isn't right. There will be progress as long as the
> API
> > > > handlers are able to pick requests off the request queue and add the
> > > > responses to the response queues (which are effectively unbounded).
> > > > However, the point is valid that blocking in the request channel's
> put
> > > has

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-09 Thread Jun Rao
Radi,

Yes, I got the benefit of bounding the request queue by bytes. My concern
is the following if we don't change the behavior of processor blocking on
queue full.

If the broker truly doesn't have enough memory for buffering outstanding
requests from all connections, we have to either hit OOM or block the
processor. Both will be bad. I am not sure if one is clearly better than
the other. In this case, the solution is probably to expand the cluster to
reduce the per broker request load.

If the broker actually has enough memory, we want to be able to configure
the request queue in such a way that it never blocks. You can tell people
to just set the request queue to be unbounded, which may scare them. If we
do want to put a bound, it seems it's easier to configure the queue size
based on # requests. Basically, we can tell people to set the queue size
based on number of connections. If the queue is based on bytes, it's not
clear how people should set it w/o causing the processor to block.

Finally, Rajini has a good point. The ByteBuffer in the request object is
allocated as soon as we see the first 4 bytes from the socket. So, I am not
sure if just bounding the request queue itself is enough to bound the
memory related to requests.

Thanks,

Jun



On Mon, Aug 8, 2016 at 4:46 PM, radai  wrote:

> I agree that filling up the request queue can cause clients to time out
> (and presumably retry?). However, for the workloads where we expect this
> configuration to be useful the alternative is currently an OOM crash.
> In my opinion an initial implementation of this feature could be
> constrained to a simple drop-in replacement of ArrayBlockingQueue
> (conditional, opt-in) and further study of behavior patterns under load can
> drive future changes to the API later when those behaviors are better
> understood (like back-pressure, nop filler responses to avoid client
> timeouts or whatever).
>
> On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com>
> wrote:
>
> > Nice write up Radai.
> > I think what Jun said is a valid concern.
> > If I am not wrong as per the proposal, we are depending on the entire
> > pipeline to flow smoothly from accepting requests to handling it, calling
> > KafkaApis and handing back the responses.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy  wrote:
> >
> > > >
> > > > .
> > > >>
> > > >>
> > > > Hi Becket,
> > > >
> > > > I don't think progress can be made in the processor's run loop if the
> > > > queue fills up. i.e., I think Jun's point is that if the queue is
> full
> > > > (either due to the proposed max.bytes or today due to max.requests
> > > hitting
> > > > the limit) then processCompletedReceives will block and no further
> > > progress
> > > > can be made.
> > > >
> > >
> > > I'm sorry - this isn't right. There will be progress as long as the API
> > > handlers are able to pick requests off the request queue and add the
> > > responses to the response queues (which are effectively unbounded).
> > > However, the point is valid that blocking in the request channel's put
> > has
> > > the effect of exacerbating the pressure on the socket server.
> > >
> > >
> > > >
> > > >>
> > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao  wrote:
> > > >>
> > > >> > Radai,
> > > >> >
> > > >> > Thanks for the proposal. A couple of comments on this.
> > > >> >
> > > >> > 1. Since we store request objects in the request queue, how do we
> > get
> > > an
> > > >> > accurate size estimate for those requests?
> > > >> >
> > > >> > 2. Currently, it's bad if the processor blocks on adding a request
> > to
> > > >> the
> > > >> > request queue. Once blocked, the processor can't process the
> sending
> > > of
> > > >> > responses of other socket keys either. This will cause all clients
> > in
> > > >> this
> > > >> > processor with an outstanding request to eventually timeout.
> > > Typically,
> > > >> > this will trigger client-side retries, which will add more load on
> > the
> > > >> > broker and cause potentially more congestion in the request queue.
> > > With
> > > >> > queued.max.requests, to prevent blocking on the request queue, our
> > > >> > recommendation is to configure queued.max.requests to be the same
> as
> > > the
> > > >> > number of socket connections on the broker. Since the broker never
> > > >> > processes more than 1 request per connection at a time, the
> request
> > > >> queue
> > > >> > will never be blocked. With queued.max.bytes, it's going to be
> > harder
> > > to
> > > >> > configure the value properly to prevent blocking.
> > > >> >
> > > >> > So, while adding queued.max.bytes is potentially useful for memory
> > > >> > management, for it to be truly useful, we probably need to address
> > the
> > > >> > processor blocking issue for it to be really useful in practice.
> One
> > > >> > possibility is to put back-pressure to the client when the request
> > > >> queue is
> > > >> > blocked. For example, 

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-09 Thread Rajini Sivaram
I like the simplicity of the approach and can see that it is an improvement
over the current implementation in typical scenarios. But I would like to
see Jun's proposal to mute sockets explored further. With the proposal in
the KIP to limit queue size, I am not sure how to calculate the total
memory requirements for brokers to avoid OOM, particularly to avoid DoS
attacks in a secure cluster. Even though the KIP says, the actual memory
bound is "queued.max.bytes + socket.request.max.bytes",  it seems to me
that even with the queue limits, brokers will need close to
(socket.request.max.bytes
* n) bytes of memory to guarantee they don't OOM (where n is the number of
connections). That upper limit is no different to the current
implementation. With client quotas that delay responses and client requests
that may be retried due to blocking processors, as soon as there is space
in the request queue that unblocks the processor, the next select operation
could read in a lot of data from a lot of connections. So controlling the
amount of data read from the socket could be as important as controlling
the request queue size.

Don't mind this being done in two stages, but it will be good to understand
what the final solution would look like before this one is committed.


On Tue, Aug 9, 2016 at 1:58 AM, Becket Qin  wrote:

> Thought about this again. If I understand correctly Jun's concern is about
> the cascading effect. Currently the processor will try to put all the
> requests received in one poll() call into the RequestChannel. This could
> potentially be long if the queue is moving really really slowly. If we
> don't mute the sockets and clients time out then reconnect, the processor
> may read more requests from the new sockets and take even longer to put
> them into the RequestChannel. So it would be good see if we can avoid this
> cascading effects.
>
> On the other end, allowing the processor to keep reading requests from the
> sockets when the RequestChannel is full also seems hurting the memory usage
> control effort.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Aug 8, 2016 at 4:46 PM, radai  wrote:
>
> > I agree that filling up the request queue can cause clients to time out
> > (and presumably retry?). However, for the workloads where we expect this
> > configuration to be useful the alternative is currently an OOM crash.
> > In my opinion an initial implementation of this feature could be
> > constrained to a simple drop-in replacement of ArrayBlockingQueue
> > (conditional, opt-in) and further study of behavior patterns under load
> can
> > drive future changes to the API later when those behaviors are better
> > understood (like back-pressure, nop filler responses to avoid client
> > timeouts or whatever).
> >
> > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com>
> > wrote:
> >
> > > Nice write up Radai.
> > > I think what Jun said is a valid concern.
> > > If I am not wrong as per the proposal, we are depending on the entire
> > > pipeline to flow smoothly from accepting requests to handling it,
> calling
> > > KafkaApis and handing back the responses.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > >
> > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy 
> wrote:
> > >
> > > > >
> > > > > .
> > > > >>
> > > > >>
> > > > > Hi Becket,
> > > > >
> > > > > I don't think progress can be made in the processor's run loop if
> the
> > > > > queue fills up. i.e., I think Jun's point is that if the queue is
> > full
> > > > > (either due to the proposed max.bytes or today due to max.requests
> > > > hitting
> > > > > the limit) then processCompletedReceives will block and no further
> > > > progress
> > > > > can be made.
> > > > >
> > > >
> > > > I'm sorry - this isn't right. There will be progress as long as the
> API
> > > > handlers are able to pick requests off the request queue and add the
> > > > responses to the response queues (which are effectively unbounded).
> > > > However, the point is valid that blocking in the request channel's
> put
> > > has
> > > > the effect of exacerbating the pressure on the socket server.
> > > >
> > > >
> > > > >
> > > > >>
> > > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao 
> wrote:
> > > > >>
> > > > >> > Radai,
> > > > >> >
> > > > >> > Thanks for the proposal. A couple of comments on this.
> > > > >> >
> > > > >> > 1. Since we store request objects in the request queue, how do
> we
> > > get
> > > > an
> > > > >> > accurate size estimate for those requests?
> > > > >> >
> > > > >> > 2. Currently, it's bad if the processor blocks on adding a
> request
> > > to
> > > > >> the
> > > > >> > request queue. Once blocked, the processor can't process the
> > sending
> > > > of
> > > > >> > responses of other socket keys either. This will cause all
> clients
> > > in
> > > > >> this
> > > > >> > processor with an outstanding request to eventually timeout.
> > > > Typically,
> > > > >> > this will trigger client-side retries, which will add

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-08 Thread Becket Qin
Thought about this again. If I understand correctly Jun's concern is about
the cascading effect. Currently the processor will try to put all the
requests received in one poll() call into the RequestChannel. This could
potentially be long if the queue is moving really really slowly. If we
don't mute the sockets and clients time out then reconnect, the processor
may read more requests from the new sockets and take even longer to put
them into the RequestChannel. So it would be good see if we can avoid this
cascading effects.

On the other end, allowing the processor to keep reading requests from the
sockets when the RequestChannel is full also seems hurting the memory usage
control effort.

Thanks,

Jiangjie (Becket) Qin

On Mon, Aug 8, 2016 at 4:46 PM, radai  wrote:

> I agree that filling up the request queue can cause clients to time out
> (and presumably retry?). However, for the workloads where we expect this
> configuration to be useful the alternative is currently an OOM crash.
> In my opinion an initial implementation of this feature could be
> constrained to a simple drop-in replacement of ArrayBlockingQueue
> (conditional, opt-in) and further study of behavior patterns under load can
> drive future changes to the API later when those behaviors are better
> understood (like back-pressure, nop filler responses to avoid client
> timeouts or whatever).
>
> On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com>
> wrote:
>
> > Nice write up Radai.
> > I think what Jun said is a valid concern.
> > If I am not wrong as per the proposal, we are depending on the entire
> > pipeline to flow smoothly from accepting requests to handling it, calling
> > KafkaApis and handing back the responses.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy  wrote:
> >
> > > >
> > > > .
> > > >>
> > > >>
> > > > Hi Becket,
> > > >
> > > > I don't think progress can be made in the processor's run loop if the
> > > > queue fills up. i.e., I think Jun's point is that if the queue is
> full
> > > > (either due to the proposed max.bytes or today due to max.requests
> > > hitting
> > > > the limit) then processCompletedReceives will block and no further
> > > progress
> > > > can be made.
> > > >
> > >
> > > I'm sorry - this isn't right. There will be progress as long as the API
> > > handlers are able to pick requests off the request queue and add the
> > > responses to the response queues (which are effectively unbounded).
> > > However, the point is valid that blocking in the request channel's put
> > has
> > > the effect of exacerbating the pressure on the socket server.
> > >
> > >
> > > >
> > > >>
> > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao  wrote:
> > > >>
> > > >> > Radai,
> > > >> >
> > > >> > Thanks for the proposal. A couple of comments on this.
> > > >> >
> > > >> > 1. Since we store request objects in the request queue, how do we
> > get
> > > an
> > > >> > accurate size estimate for those requests?
> > > >> >
> > > >> > 2. Currently, it's bad if the processor blocks on adding a request
> > to
> > > >> the
> > > >> > request queue. Once blocked, the processor can't process the
> sending
> > > of
> > > >> > responses of other socket keys either. This will cause all clients
> > in
> > > >> this
> > > >> > processor with an outstanding request to eventually timeout.
> > > Typically,
> > > >> > this will trigger client-side retries, which will add more load on
> > the
> > > >> > broker and cause potentially more congestion in the request queue.
> > > With
> > > >> > queued.max.requests, to prevent blocking on the request queue, our
> > > >> > recommendation is to configure queued.max.requests to be the same
> as
> > > the
> > > >> > number of socket connections on the broker. Since the broker never
> > > >> > processes more than 1 request per connection at a time, the
> request
> > > >> queue
> > > >> > will never be blocked. With queued.max.bytes, it's going to be
> > harder
> > > to
> > > >> > configure the value properly to prevent blocking.
> > > >> >
> > > >> > So, while adding queued.max.bytes is potentially useful for memory
> > > >> > management, for it to be truly useful, we probably need to address
> > the
> > > >> > processor blocking issue for it to be really useful in practice.
> One
> > > >> > possibility is to put back-pressure to the client when the request
> > > >> queue is
> > > >> > blocked. For example, if the processor notices that the request
> > queue
> > > is
> > > >> > full, it can turn off the interest bit for read for all socket
> keys.
> > > >> This
> > > >> > will allow the processor to continue handling responses. When the
> > > >> request
> > > >> > queue has space again, it can indicate the new state to the
> process
> > > and
> > > >> > wake up the selector. Not sure how this will work with multiple
> > > >> processors
> > > >> > though since the request queue is shared across all processors.
> > > >> >
> > > >> > Thanks,

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-08 Thread radai
I agree that filling up the request queue can cause clients to time out
(and presumably retry?). However, for the workloads where we expect this
configuration to be useful the alternative is currently an OOM crash.
In my opinion an initial implementation of this feature could be
constrained to a simple drop-in replacement of ArrayBlockingQueue
(conditional, opt-in) and further study of behavior patterns under load can
drive future changes to the API later when those behaviors are better
understood (like back-pressure, nop filler responses to avoid client
timeouts or whatever).

On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat 
wrote:

> Nice write up Radai.
> I think what Jun said is a valid concern.
> If I am not wrong as per the proposal, we are depending on the entire
> pipeline to flow smoothly from accepting requests to handling it, calling
> KafkaApis and handing back the responses.
>
> Thanks,
>
> Mayuresh
>
>
> On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy  wrote:
>
> > >
> > > .
> > >>
> > >>
> > > Hi Becket,
> > >
> > > I don't think progress can be made in the processor's run loop if the
> > > queue fills up. i.e., I think Jun's point is that if the queue is full
> > > (either due to the proposed max.bytes or today due to max.requests
> > hitting
> > > the limit) then processCompletedReceives will block and no further
> > progress
> > > can be made.
> > >
> >
> > I'm sorry - this isn't right. There will be progress as long as the API
> > handlers are able to pick requests off the request queue and add the
> > responses to the response queues (which are effectively unbounded).
> > However, the point is valid that blocking in the request channel's put
> has
> > the effect of exacerbating the pressure on the socket server.
> >
> >
> > >
> > >>
> > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao  wrote:
> > >>
> > >> > Radai,
> > >> >
> > >> > Thanks for the proposal. A couple of comments on this.
> > >> >
> > >> > 1. Since we store request objects in the request queue, how do we
> get
> > an
> > >> > accurate size estimate for those requests?
> > >> >
> > >> > 2. Currently, it's bad if the processor blocks on adding a request
> to
> > >> the
> > >> > request queue. Once blocked, the processor can't process the sending
> > of
> > >> > responses of other socket keys either. This will cause all clients
> in
> > >> this
> > >> > processor with an outstanding request to eventually timeout.
> > Typically,
> > >> > this will trigger client-side retries, which will add more load on
> the
> > >> > broker and cause potentially more congestion in the request queue.
> > With
> > >> > queued.max.requests, to prevent blocking on the request queue, our
> > >> > recommendation is to configure queued.max.requests to be the same as
> > the
> > >> > number of socket connections on the broker. Since the broker never
> > >> > processes more than 1 request per connection at a time, the request
> > >> queue
> > >> > will never be blocked. With queued.max.bytes, it's going to be
> harder
> > to
> > >> > configure the value properly to prevent blocking.
> > >> >
> > >> > So, while adding queued.max.bytes is potentially useful for memory
> > >> > management, for it to be truly useful, we probably need to address
> the
> > >> > processor blocking issue for it to be really useful in practice. One
> > >> > possibility is to put back-pressure to the client when the request
> > >> queue is
> > >> > blocked. For example, if the processor notices that the request
> queue
> > is
> > >> > full, it can turn off the interest bit for read for all socket keys.
> > >> This
> > >> > will allow the processor to continue handling responses. When the
> > >> request
> > >> > queue has space again, it can indicate the new state to the process
> > and
> > >> > wake up the selector. Not sure how this will work with multiple
> > >> processors
> > >> > though since the request queue is shared across all processors.
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Jun
> > >> >
> > >> >
> > >> >
> > >> > On Thu, Aug 4, 2016 at 11:28 AM, radai 
> > >> wrote:
> > >> >
> > >> > > Hello,
> > >> > >
> > >> > > I'd like to initiate a discussion about
> > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> > > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes
> > >> > >
> > >> > > The goal of the KIP is to allow configuring a bound on the
> capacity
> > >> (as
> > >> > in
> > >> > > bytes of memory used) of the incoming request queue, in addition
> to
> > >> the
> > >> > > current bound on the number of messages.
> > >> > >
> > >> > > This comes after several incidents at Linkedin where a sudden
> > "spike"
> > >> of
> > >> > > large message batches caused an out of memory exception.
> > >> > >
> > >> > > Thank you,
> > >> > >
> > >> > >Radai
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-08 Thread Mayuresh Gharat
Nice write up Radai.
I think what Jun said is a valid concern.
If I am not wrong as per the proposal, we are depending on the entire
pipeline to flow smoothly from accepting requests to handling it, calling
KafkaApis and handing back the responses.

Thanks,

Mayuresh


On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy  wrote:

> >
> > .
> >>
> >>
> > Hi Becket,
> >
> > I don't think progress can be made in the processor's run loop if the
> > queue fills up. i.e., I think Jun's point is that if the queue is full
> > (either due to the proposed max.bytes or today due to max.requests
> hitting
> > the limit) then processCompletedReceives will block and no further
> progress
> > can be made.
> >
>
> I'm sorry - this isn't right. There will be progress as long as the API
> handlers are able to pick requests off the request queue and add the
> responses to the response queues (which are effectively unbounded).
> However, the point is valid that blocking in the request channel's put has
> the effect of exacerbating the pressure on the socket server.
>
>
> >
> >>
> >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao  wrote:
> >>
> >> > Radai,
> >> >
> >> > Thanks for the proposal. A couple of comments on this.
> >> >
> >> > 1. Since we store request objects in the request queue, how do we get
> an
> >> > accurate size estimate for those requests?
> >> >
> >> > 2. Currently, it's bad if the processor blocks on adding a request to
> >> the
> >> > request queue. Once blocked, the processor can't process the sending
> of
> >> > responses of other socket keys either. This will cause all clients in
> >> this
> >> > processor with an outstanding request to eventually timeout.
> Typically,
> >> > this will trigger client-side retries, which will add more load on the
> >> > broker and cause potentially more congestion in the request queue.
> With
> >> > queued.max.requests, to prevent blocking on the request queue, our
> >> > recommendation is to configure queued.max.requests to be the same as
> the
> >> > number of socket connections on the broker. Since the broker never
> >> > processes more than 1 request per connection at a time, the request
> >> queue
> >> > will never be blocked. With queued.max.bytes, it's going to be harder
> to
> >> > configure the value properly to prevent blocking.
> >> >
> >> > So, while adding queued.max.bytes is potentially useful for memory
> >> > management, for it to be truly useful, we probably need to address the
> >> > processor blocking issue for it to be really useful in practice. One
> >> > possibility is to put back-pressure to the client when the request
> >> queue is
> >> > blocked. For example, if the processor notices that the request queue
> is
> >> > full, it can turn off the interest bit for read for all socket keys.
> >> This
> >> > will allow the processor to continue handling responses. When the
> >> request
> >> > queue has space again, it can indicate the new state to the process
> and
> >> > wake up the selector. Not sure how this will work with multiple
> >> processors
> >> > though since the request queue is shared across all processors.
> >> >
> >> > Thanks,
> >> >
> >> > Jun
> >> >
> >> >
> >> >
> >> > On Thu, Aug 4, 2016 at 11:28 AM, radai 
> >> wrote:
> >> >
> >> > > Hello,
> >> > >
> >> > > I'd like to initiate a discussion about
> >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes
> >> > >
> >> > > The goal of the KIP is to allow configuring a bound on the capacity
> >> (as
> >> > in
> >> > > bytes of memory used) of the incoming request queue, in addition to
> >> the
> >> > > current bound on the number of messages.
> >> > >
> >> > > This comes after several incidents at Linkedin where a sudden
> "spike"
> >> of
> >> > > large message batches caused an out of memory exception.
> >> > >
> >> > > Thank you,
> >> > >
> >> > >Radai
> >> > >
> >> >
> >>
> >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-08 Thread Joel Koshy
>
> .
>>
>>
> Hi Becket,
>
> I don't think progress can be made in the processor's run loop if the
> queue fills up. i.e., I think Jun's point is that if the queue is full
> (either due to the proposed max.bytes or today due to max.requests hitting
> the limit) then processCompletedReceives will block and no further progress
> can be made.
>

I'm sorry - this isn't right. There will be progress as long as the API
handlers are able to pick requests off the request queue and add the
responses to the response queues (which are effectively unbounded).
However, the point is valid that blocking in the request channel's put has
the effect of exacerbating the pressure on the socket server.


>
>>
>> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao  wrote:
>>
>> > Radai,
>> >
>> > Thanks for the proposal. A couple of comments on this.
>> >
>> > 1. Since we store request objects in the request queue, how do we get an
>> > accurate size estimate for those requests?
>> >
>> > 2. Currently, it's bad if the processor blocks on adding a request to
>> the
>> > request queue. Once blocked, the processor can't process the sending of
>> > responses of other socket keys either. This will cause all clients in
>> this
>> > processor with an outstanding request to eventually timeout. Typically,
>> > this will trigger client-side retries, which will add more load on the
>> > broker and cause potentially more congestion in the request queue. With
>> > queued.max.requests, to prevent blocking on the request queue, our
>> > recommendation is to configure queued.max.requests to be the same as the
>> > number of socket connections on the broker. Since the broker never
>> > processes more than 1 request per connection at a time, the request
>> queue
>> > will never be blocked. With queued.max.bytes, it's going to be harder to
>> > configure the value properly to prevent blocking.
>> >
>> > So, while adding queued.max.bytes is potentially useful for memory
>> > management, for it to be truly useful, we probably need to address the
>> > processor blocking issue for it to be really useful in practice. One
>> > possibility is to put back-pressure to the client when the request
>> queue is
>> > blocked. For example, if the processor notices that the request queue is
>> > full, it can turn off the interest bit for read for all socket keys.
>> This
>> > will allow the processor to continue handling responses. When the
>> request
>> > queue has space again, it can indicate the new state to the process and
>> > wake up the selector. Not sure how this will work with multiple
>> processors
>> > though since the request queue is shared across all processors.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> >
>> >
>> > On Thu, Aug 4, 2016 at 11:28 AM, radai 
>> wrote:
>> >
>> > > Hello,
>> > >
>> > > I'd like to initiate a discussion about
>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes
>> > >
>> > > The goal of the KIP is to allow configuring a bound on the capacity
>> (as
>> > in
>> > > bytes of memory used) of the incoming request queue, in addition to
>> the
>> > > current bound on the number of messages.
>> > >
>> > > This comes after several incidents at Linkedin where a sudden "spike"
>> of
>> > > large message batches caused an out of memory exception.
>> > >
>> > > Thank you,
>> > >
>> > >Radai
>> > >
>> >
>>
>
>


Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-08 Thread Joel Koshy
>
>
>
> 2. Good point about the consequence when the processor threads are
> blocking. I agree it would be important to keep the processor thread
> running, but I am not sure if it would be a problem of the current
> proposal. In most of the time, the request queue should be close to empty,
> so the processor won't block. In cases where the queue is throttled because
> of the bytes limit, the processor thread should only slow down but not
> block for long, i.e. they will be able to make progress as fast as the
> request handler threads.  At that point, the broker is already processing
> the requests at maximum speed. If the clients experiences timeout, the same
> thing will likely happen even without the byte limit or we disable READ
> from the sockets. The only difference is that the broker won't have OOM if
> we have the bytes limit.
>
>
Hi Becket,

I don't think progress can be made in the processor's run loop if the queue
fills up. i.e., I think Jun's point is that if the queue is full (either
due to the proposed max.bytes or today due to max.requests hitting the
limit) then processCompletedReceives will block and no further progress can
be made.

Joel


>
> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao  wrote:
>
> > Radai,
> >
> > Thanks for the proposal. A couple of comments on this.
> >
> > 1. Since we store request objects in the request queue, how do we get an
> > accurate size estimate for those requests?
> >
> > 2. Currently, it's bad if the processor blocks on adding a request to the
> > request queue. Once blocked, the processor can't process the sending of
> > responses of other socket keys either. This will cause all clients in
> this
> > processor with an outstanding request to eventually timeout. Typically,
> > this will trigger client-side retries, which will add more load on the
> > broker and cause potentially more congestion in the request queue. With
> > queued.max.requests, to prevent blocking on the request queue, our
> > recommendation is to configure queued.max.requests to be the same as the
> > number of socket connections on the broker. Since the broker never
> > processes more than 1 request per connection at a time, the request queue
> > will never be blocked. With queued.max.bytes, it's going to be harder to
> > configure the value properly to prevent blocking.
> >
> > So, while adding queued.max.bytes is potentially useful for memory
> > management, for it to be truly useful, we probably need to address the
> > processor blocking issue for it to be really useful in practice. One
> > possibility is to put back-pressure to the client when the request queue
> is
> > blocked. For example, if the processor notices that the request queue is
> > full, it can turn off the interest bit for read for all socket keys. This
> > will allow the processor to continue handling responses. When the request
> > queue has space again, it can indicate the new state to the process and
> > wake up the selector. Not sure how this will work with multiple
> processors
> > though since the request queue is shared across all processors.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Thu, Aug 4, 2016 at 11:28 AM, radai 
> wrote:
> >
> > > Hello,
> > >
> > > I'd like to initiate a discussion about
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes
> > >
> > > The goal of the KIP is to allow configuring a bound on the capacity (as
> > in
> > > bytes of memory used) of the incoming request queue, in addition to the
> > > current bound on the number of messages.
> > >
> > > This comes after several incidents at Linkedin where a sudden "spike"
> of
> > > large message batches caused an out of memory exception.
> > >
> > > Thank you,
> > >
> > >Radai
> > >
> >
>


Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-08 Thread Becket Qin
Hi Jun,

1. The requests in the queue are RequestChannel.Request. It contains the
raw ByteBuffer from the socket, we can probably just use that. This might
not be 100% accurate about the memory taken, but probably is good enough.

2. Good point about the consequence when the processor threads are
blocking. I agree it would be important to keep the processor thread
running, but I am not sure if it would be a problem of the current
proposal. In most of the time, the request queue should be close to empty,
so the processor won't block. In cases where the queue is throttled because
of the bytes limit, the processor thread should only slow down but not
block for long, i.e. they will be able to make progress as fast as the
request handler threads.  At that point, the broker is already processing
the requests at maximum speed. If the clients experiences timeout, the same
thing will likely happen even without the byte limit or we disable READ
from the sockets. The only difference is that the broker won't have OOM if
we have the bytes limit.

Thanks,

Jiangjie (Becket) Qin



On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao  wrote:

> Radai,
>
> Thanks for the proposal. A couple of comments on this.
>
> 1. Since we store request objects in the request queue, how do we get an
> accurate size estimate for those requests?
>
> 2. Currently, it's bad if the processor blocks on adding a request to the
> request queue. Once blocked, the processor can't process the sending of
> responses of other socket keys either. This will cause all clients in this
> processor with an outstanding request to eventually timeout. Typically,
> this will trigger client-side retries, which will add more load on the
> broker and cause potentially more congestion in the request queue. With
> queued.max.requests, to prevent blocking on the request queue, our
> recommendation is to configure queued.max.requests to be the same as the
> number of socket connections on the broker. Since the broker never
> processes more than 1 request per connection at a time, the request queue
> will never be blocked. With queued.max.bytes, it's going to be harder to
> configure the value properly to prevent blocking.
>
> So, while adding queued.max.bytes is potentially useful for memory
> management, for it to be truly useful, we probably need to address the
> processor blocking issue for it to be really useful in practice. One
> possibility is to put back-pressure to the client when the request queue is
> blocked. For example, if the processor notices that the request queue is
> full, it can turn off the interest bit for read for all socket keys. This
> will allow the processor to continue handling responses. When the request
> queue has space again, it can indicate the new state to the process and
> wake up the selector. Not sure how this will work with multiple processors
> though since the request queue is shared across all processors.
>
> Thanks,
>
> Jun
>
>
>
> On Thu, Aug 4, 2016 at 11:28 AM, radai  wrote:
>
> > Hello,
> >
> > I'd like to initiate a discussion about
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes
> >
> > The goal of the KIP is to allow configuring a bound on the capacity (as
> in
> > bytes of memory used) of the incoming request queue, in addition to the
> > current bound on the number of messages.
> >
> > This comes after several incidents at Linkedin where a sudden "spike" of
> > large message batches caused an out of memory exception.
> >
> > Thank you,
> >
> >Radai
> >
>


Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-08 Thread Jun Rao
Radai,

Thanks for the proposal. A couple of comments on this.

1. Since we store request objects in the request queue, how do we get an
accurate size estimate for those requests?

2. Currently, it's bad if the processor blocks on adding a request to the
request queue. Once blocked, the processor can't process the sending of
responses of other socket keys either. This will cause all clients in this
processor with an outstanding request to eventually timeout. Typically,
this will trigger client-side retries, which will add more load on the
broker and cause potentially more congestion in the request queue. With
queued.max.requests, to prevent blocking on the request queue, our
recommendation is to configure queued.max.requests to be the same as the
number of socket connections on the broker. Since the broker never
processes more than 1 request per connection at a time, the request queue
will never be blocked. With queued.max.bytes, it's going to be harder to
configure the value properly to prevent blocking.

So, while adding queued.max.bytes is potentially useful for memory
management, for it to be truly useful, we probably need to address the
processor blocking issue for it to be really useful in practice. One
possibility is to put back-pressure to the client when the request queue is
blocked. For example, if the processor notices that the request queue is
full, it can turn off the interest bit for read for all socket keys. This
will allow the processor to continue handling responses. When the request
queue has space again, it can indicate the new state to the process and
wake up the selector. Not sure how this will work with multiple processors
though since the request queue is shared across all processors.

Thanks,

Jun



On Thu, Aug 4, 2016 at 11:28 AM, radai  wrote:

> Hello,
>
> I'd like to initiate a discussion about
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes
>
> The goal of the KIP is to allow configuring a bound on the capacity (as in
> bytes of memory used) of the incoming request queue, in addition to the
> current bound on the number of messages.
>
> This comes after several incidents at Linkedin where a sudden "spike" of
> large message batches caused an out of memory exception.
>
> Thank you,
>
>Radai
>


Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-05 Thread radai
Supporting both configs at the same time to begin with would allow enough
time for users to experiment with the settings. If in the future its deemed
that a memory bound is far superior to a #req bound (which is my opinion
but needs evidence to support) the migration discussion could be deferred
to that point, making immediate impact tiny (conf upgrade is trivial,
behavior remains backwards compatible, and new feature is "opt-in").

I will extend the Unit tests (and fix the bugs) as part of the
implementation.

As for the performance implications:
1. my initial runs (https://github.com/radai-rosenblatt/kafka-benchmarks)
show ~5Mops/sec using the current queue (ArrayBlockingQueueBenchmark) and
~400Kops/sec using the proposed queue datastructure
(ByteBoundedBlockingQueueBenchmark). this is using 16 producer threads and
24 consumer threads on my machine.
2. ByteBoundedBlockingQueue uses composition on top of ArrayBlockingQueue,
since ArrayBlockingQueue is not easily extendable. this leads to the double
locking you mentioned. an alternative would be to basically copy-paste
ArrayBlockingQueue and add the functionality (as done in
https://github.com/radai-rosenblatt/kafka-benchmarks/blob/master/src/main/java/net/radai/kafka/MetricBoundedArrayBlockingQueue.java).
this shown no noticeable impact on queue performance (also covered in my
benchmark code) but is far less elegant.
3. 400Kops/sec is still more than enough for realistic kafka installations,
and the effect can be further mitigated by using ByteBoundedBlockingQueue
only if the configuration was set (so no impact if the feature is "opt in"
on default installations)
4. As part of the implementation I will also deploy the proposed code in a
test env here (linkedin) and report back on measured throughput vs a
vanilla version

On Fri, Aug 5, 2016 at 8:38 AM, Ismael Juma  wrote:

> Hi Radai,
>
> Thanks for the proposal. I think it makes sense to be able to limit the
> request queue by bytes. I haven't made up my mind on whether having both
> limits is better than having a single one yet, but we probably need to make
> a call on that before we can start a vote.
>
> Just a quick point with regards to ByteBoundedBlockingQueue. It's not
> currently used in the code and the existing unit tests are very basic.
> Also, looking at the implementation, it has two obvious bugs (it fails if
> one passes None for `sizeFunction` and `currentByteSize` is not final and
> it should be to comply with the JMM). So, we probably need to extend the
> unit tests for that class if we want to use it (ideally there would be
> multi-threaded tests too). It also is a bit inefficient as its adds
> additional locks on top of the LinkedBlockingQueue ones, but the benchmarks
> should hopefully show if that is something that needs to be addressed or
> not.
>
> Ismael
>
> On Thu, Aug 4, 2016 at 7:28 PM, radai  wrote:
>
> > Hello,
> >
> > I'd like to initiate a discussion about
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes
> >
> > The goal of the KIP is to allow configuring a bound on the capacity (as
> in
> > bytes of memory used) of the incoming request queue, in addition to the
> > current bound on the number of messages.
> >
> > This comes after several incidents at Linkedin where a sudden "spike" of
> > large message batches caused an out of memory exception.
> >
> > Thank you,
> >
> >Radai
> >
>


Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-05 Thread Onur Karaman
Thanks for the proposal, Radai. +1

On Fri, Aug 5, 2016 at 8:38 AM, Ismael Juma  wrote:

> Hi Radai,
>
> Thanks for the proposal. I think it makes sense to be able to limit the
> request queue by bytes. I haven't made up my mind on whether having both
> limits is better than having a single one yet, but we probably need to make
> a call on that before we can start a vote.
>
> Just a quick point with regards to ByteBoundedBlockingQueue. It's not
> currently used in the code and the existing unit tests are very basic.
> Also, looking at the implementation, it has two obvious bugs (it fails if
> one passes None for `sizeFunction` and `currentByteSize` is not final and
> it should be to comply with the JMM). So, we probably need to extend the
> unit tests for that class if we want to use it (ideally there would be
> multi-threaded tests too). It also is a bit inefficient as its adds
> additional locks on top of the LinkedBlockingQueue ones, but the benchmarks
> should hopefully show if that is something that needs to be addressed or
> not.
>
> Ismael
>
> On Thu, Aug 4, 2016 at 7:28 PM, radai  wrote:
>
> > Hello,
> >
> > I'd like to initiate a discussion about
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes
> >
> > The goal of the KIP is to allow configuring a bound on the capacity (as
> in
> > bytes of memory used) of the incoming request queue, in addition to the
> > current bound on the number of messages.
> >
> > This comes after several incidents at Linkedin where a sudden "spike" of
> > large message batches caused an out of memory exception.
> >
> > Thank you,
> >
> >Radai
> >
>


Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-05 Thread Ismael Juma
Hi Radai,

Thanks for the proposal. I think it makes sense to be able to limit the
request queue by bytes. I haven't made up my mind on whether having both
limits is better than having a single one yet, but we probably need to make
a call on that before we can start a vote.

Just a quick point with regards to ByteBoundedBlockingQueue. It's not
currently used in the code and the existing unit tests are very basic.
Also, looking at the implementation, it has two obvious bugs (it fails if
one passes None for `sizeFunction` and `currentByteSize` is not final and
it should be to comply with the JMM). So, we probably need to extend the
unit tests for that class if we want to use it (ideally there would be
multi-threaded tests too). It also is a bit inefficient as its adds
additional locks on top of the LinkedBlockingQueue ones, but the benchmarks
should hopefully show if that is something that needs to be addressed or
not.

Ismael

On Thu, Aug 4, 2016 at 7:28 PM, radai  wrote:

> Hello,
>
> I'd like to initiate a discussion about
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes
>
> The goal of the KIP is to allow configuring a bound on the capacity (as in
> bytes of memory used) of the incoming request queue, in addition to the
> current bound on the number of messages.
>
> This comes after several incidents at Linkedin where a sudden "spike" of
> large message batches caused an out of memory exception.
>
> Thank you,
>
>Radai
>


Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-05 Thread Grant Henke
The proposal makes sense to me. I like that the plan to support both limits
simultaneously:

queued.max.requests is supported in addition queued.max.bytes (both
> respected at the same time). In this case a default value of
> queued.max.bytes = -1 would maintain backwards compatible behavior.


Especially since ByteBoundedBlockingQueue already supports a limit on byte
and element count.

I actually don't mind the property naming of queued.max.*. I am not sure if
the added clarity of requestQueue.max.* is worth the migration.





On Fri, Aug 5, 2016 at 9:34 AM, Tom Crayford  wrote:

> This makes good sense to me, and seems to have very low amounts of downside
> with large amounts of upside. +1
>
> On Thursday, 4 August 2016, radai  wrote:
>
> > Hello,
> >
> > I'd like to initiate a discussion about
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes
> >
> > The goal of the KIP is to allow configuring a bound on the capacity (as
> in
> > bytes of memory used) of the incoming request queue, in addition to the
> > current bound on the number of messages.
> >
> > This comes after several incidents at Linkedin where a sudden "spike" of
> > large message batches caused an out of memory exception.
> >
> > Thank you,
> >
> >Radai
> >
>



-- 
Grant Henke
Software Engineer | Cloudera
gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke


Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-08-05 Thread Tom Crayford
This makes good sense to me, and seems to have very low amounts of downside
with large amounts of upside. +1

On Thursday, 4 August 2016, radai  wrote:

> Hello,
>
> I'd like to initiate a discussion about
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes
>
> The goal of the KIP is to allow configuring a bound on the capacity (as in
> bytes of memory used) of the incoming request queue, in addition to the
> current bound on the number of messages.
>
> This comes after several incidents at Linkedin where a sudden "spike" of
> large message batches caused an out of memory exception.
>
> Thank you,
>
>Radai
>