Re: KStream Close Processor

2016-04-11 Thread Guozhang Wang
Yeah we can definitely do better in documentation. While regarding the API
changes I would prefer to hold and think through if such use cases are
common in pattern, and that if we can even re-order the closing process to
get around the issue I mentioned above if it is required.

Guozhang

On Mon, Apr 11, 2016 at 4:16 AM, Matthias J. Sax 
wrote:

> What about extending the API with a method beforeClose() that enables
> the user to flush buffered data?
>
> Maybe we can also rename close() to afterClose(), to make the difference
> clear. At least, we should document when close() is called -- from a
> user point of view, I would expect that close() allows to flush data
> what is not the case.
>
> -Matthias
>
> On 04/11/2016 02:30 AM, Guozhang Wang wrote:
> > Re 1), Kafka Streams intentionally close all underlying clients before
> > closing processors since some of closing the processors require shutting
> > down its processor state managers, for example we need to make sure
> > producer's message sends // have all been acked before the state manager
> > records // changelog sent offsets. To complement it we trigger
> commitAll()
> > before closing the clients.
> >
> >
> > Guozhang
> >
> > On Sun, Apr 10, 2016 at 9:17 AM, Jay Kreps  wrote:
> >
> >> Also, I wonder if this issue is related:
> >> https://issues.apache.org/jira/browse/KAFKA-3135
> >>
> >> -Jay
> >>
> >> On Sun, Apr 10, 2016 at 8:58 AM, Jay Kreps  wrote:
> >>
> >>> Two things:
> >>> 1. Caching data in the processor is a bit dangerous since it will be
> lost
> >>> on failure. Nonetheless, I think you have a point that we should
> ideally
> >>> close the processors first, then commit in case they send any messages
> on
> >>> close.
> >>> 2. The issue you describe shouldn't happen for the reason you describe.
> >>> Both the broker and the consumer handle batches of messages so
> fetching a
> >>> single 1 MB message versus 1024 1KB messages should be the same. The
> >>> proposed max.poll.messages would just effect how many records are
> handed
> >>> out they will have been fetched and be in memory in the consumer no
> >> matter
> >>> what. I wonder if you could help us trace down what's happening for
> >>> you--maybe provide a simple test case that reproduces the problem?
> >>>
> >>>
> >>> On Sun, Apr 10, 2016 at 6:13 AM, Michael D. Coon <
> >>> mdco...@yahoo.com.invalid> wrote:
> >>>
>  Guozhang,
> Yes, I'm merging message contents into larger messages before
> sending
>  to the producer. We have demonstrated that many tiny messages of < 1K
>  causes tremendous slow down on the down stream consumers. Not because
> of
>  memory contention but because of the broker filling up the max fetch
>  request size by adding hundreds of thousands of tiny messages to the
> >> fetch
>  response. The consumer then has to deal with those messages and it
> >> causes
>  huge latency problems….the broker has to add those hundreds of
> >> thousands of
>  messages to the response. It takes > 5 seconds per fetch to return
> from
> >> the
>  broker in most cases. In contrast, when I merge messages into bundled
>  single-messages with larger payloads, we get excellent throughput
> >> because
>  there is less polling and the number of messages is reduced.
> I'm locked into a battle between fetch size constraints and max
>  message size constraints…my max message size can actually spike over
> 5MB
>  for a single message (non-merged) but most of the time it's < 1K.
> That's
>  just the kind of data set we're dealing with. So I can't set fetch
> size
> >> too
>  low or one of these larger messages will come in and break the
> consumer
>  from being able to process anything.
> So we either need a way to tell the broker not to fill the max
> fetch
>  size before returning (max.poll.messages) or I need a way to flush to
> >> the
>  producer when it's about to close my producer. The latter offers the
>  benefit of flushing data that may be the results of processing input
> >> data
>  whose offsets were already committed asynchronously.
>  Mike
> 
>  On Saturday, April 9, 2016 2:27 PM, Guozhang Wang <
> >> wangg...@gmail.com>
>  wrote:
> 
> 
>   Mike,
> 
>  Not clear what do you mean by "buffering up the contents". Producer
> >> itself
>  already did some buffering and batching when sending to Kafka. Did you
>  actually "merge" multiple small messages into one large message before
>  giving it to the producer in the app code? In either case, I am not
> sure
>  how it will help the downstream consumer memory pressure issue?
> 
>  About bounding the consumer memory usage, we already have some
> thoughts
>  about that issue and plan to add the memory bounding feature like the
>  producer does in the near future (
>  

Re: KStream Close Processor

2016-04-11 Thread Matthias J. Sax
What about extending the API with a method beforeClose() that enables
the user to flush buffered data?

Maybe we can also rename close() to afterClose(), to make the difference
clear. At least, we should document when close() is called -- from a
user point of view, I would expect that close() allows to flush data
what is not the case.

-Matthias

On 04/11/2016 02:30 AM, Guozhang Wang wrote:
> Re 1), Kafka Streams intentionally close all underlying clients before
> closing processors since some of closing the processors require shutting
> down its processor state managers, for example we need to make sure
> producer's message sends // have all been acked before the state manager
> records // changelog sent offsets. To complement it we trigger commitAll()
> before closing the clients.
> 
> 
> Guozhang
> 
> On Sun, Apr 10, 2016 at 9:17 AM, Jay Kreps  wrote:
> 
>> Also, I wonder if this issue is related:
>> https://issues.apache.org/jira/browse/KAFKA-3135
>>
>> -Jay
>>
>> On Sun, Apr 10, 2016 at 8:58 AM, Jay Kreps  wrote:
>>
>>> Two things:
>>> 1. Caching data in the processor is a bit dangerous since it will be lost
>>> on failure. Nonetheless, I think you have a point that we should ideally
>>> close the processors first, then commit in case they send any messages on
>>> close.
>>> 2. The issue you describe shouldn't happen for the reason you describe.
>>> Both the broker and the consumer handle batches of messages so fetching a
>>> single 1 MB message versus 1024 1KB messages should be the same. The
>>> proposed max.poll.messages would just effect how many records are handed
>>> out they will have been fetched and be in memory in the consumer no
>> matter
>>> what. I wonder if you could help us trace down what's happening for
>>> you--maybe provide a simple test case that reproduces the problem?
>>>
>>>
>>> On Sun, Apr 10, 2016 at 6:13 AM, Michael D. Coon <
>>> mdco...@yahoo.com.invalid> wrote:
>>>
 Guozhang,
Yes, I'm merging message contents into larger messages before sending
 to the producer. We have demonstrated that many tiny messages of < 1K
 causes tremendous slow down on the down stream consumers. Not because of
 memory contention but because of the broker filling up the max fetch
 request size by adding hundreds of thousands of tiny messages to the
>> fetch
 response. The consumer then has to deal with those messages and it
>> causes
 huge latency problems….the broker has to add those hundreds of
>> thousands of
 messages to the response. It takes > 5 seconds per fetch to return from
>> the
 broker in most cases. In contrast, when I merge messages into bundled
 single-messages with larger payloads, we get excellent throughput
>> because
 there is less polling and the number of messages is reduced.
I'm locked into a battle between fetch size constraints and max
 message size constraints…my max message size can actually spike over 5MB
 for a single message (non-merged) but most of the time it's < 1K. That's
 just the kind of data set we're dealing with. So I can't set fetch size
>> too
 low or one of these larger messages will come in and break the consumer
 from being able to process anything.
So we either need a way to tell the broker not to fill the max fetch
 size before returning (max.poll.messages) or I need a way to flush to
>> the
 producer when it's about to close my producer. The latter offers the
 benefit of flushing data that may be the results of processing input
>> data
 whose offsets were already committed asynchronously.
 Mike

 On Saturday, April 9, 2016 2:27 PM, Guozhang Wang <
>> wangg...@gmail.com>
 wrote:


  Mike,

 Not clear what do you mean by "buffering up the contents". Producer
>> itself
 already did some buffering and batching when sending to Kafka. Did you
 actually "merge" multiple small messages into one large message before
 giving it to the producer in the app code? In either case, I am not sure
 how it will help the downstream consumer memory pressure issue?

 About bounding the consumer memory usage, we already have some thoughts
 about that issue and plan to add the memory bounding feature like the
 producer does in the near future (
 https://issues.apache.org/jira/browse/KAFKA-2045), so it won't be a
 problem
 for long. And for the "max.poll.messages" config and 0.10.0, just FYI we
 are shooting to have it released end of this month.

 Guozhang


 On Sat, Apr 9, 2016 at 5:59 AM, Michael D. Coon
>> 
 wrote:

> Guozhang,
>In my processor, I'm buffering up contents of the final messages in
> order to make them larger. This is to optimize throughput and avoid
>> tiny
> messages from being injected downstream. So nothing is being pushed to
 the
> producer 

Re: KStream Close Processor

2016-04-10 Thread Guozhang Wang
Re 1), Kafka Streams intentionally close all underlying clients before
closing processors since some of closing the processors require shutting
down its processor state managers, for example we need to make sure
producer's message sends // have all been acked before the state manager
records // changelog sent offsets. To complement it we trigger commitAll()
before closing the clients.


Guozhang

On Sun, Apr 10, 2016 at 9:17 AM, Jay Kreps  wrote:

> Also, I wonder if this issue is related:
> https://issues.apache.org/jira/browse/KAFKA-3135
>
> -Jay
>
> On Sun, Apr 10, 2016 at 8:58 AM, Jay Kreps  wrote:
>
> > Two things:
> > 1. Caching data in the processor is a bit dangerous since it will be lost
> > on failure. Nonetheless, I think you have a point that we should ideally
> > close the processors first, then commit in case they send any messages on
> > close.
> > 2. The issue you describe shouldn't happen for the reason you describe.
> > Both the broker and the consumer handle batches of messages so fetching a
> > single 1 MB message versus 1024 1KB messages should be the same. The
> > proposed max.poll.messages would just effect how many records are handed
> > out they will have been fetched and be in memory in the consumer no
> matter
> > what. I wonder if you could help us trace down what's happening for
> > you--maybe provide a simple test case that reproduces the problem?
> >
> >
> > On Sun, Apr 10, 2016 at 6:13 AM, Michael D. Coon <
> > mdco...@yahoo.com.invalid> wrote:
> >
> >> Guozhang,
> >>Yes, I'm merging message contents into larger messages before sending
> >> to the producer. We have demonstrated that many tiny messages of < 1K
> >> causes tremendous slow down on the down stream consumers. Not because of
> >> memory contention but because of the broker filling up the max fetch
> >> request size by adding hundreds of thousands of tiny messages to the
> fetch
> >> response. The consumer then has to deal with those messages and it
> causes
> >> huge latency problems….the broker has to add those hundreds of
> thousands of
> >> messages to the response. It takes > 5 seconds per fetch to return from
> the
> >> broker in most cases. In contrast, when I merge messages into bundled
> >> single-messages with larger payloads, we get excellent throughput
> because
> >> there is less polling and the number of messages is reduced.
> >>I'm locked into a battle between fetch size constraints and max
> >> message size constraints…my max message size can actually spike over 5MB
> >> for a single message (non-merged) but most of the time it's < 1K. That's
> >> just the kind of data set we're dealing with. So I can't set fetch size
> too
> >> low or one of these larger messages will come in and break the consumer
> >> from being able to process anything.
> >>So we either need a way to tell the broker not to fill the max fetch
> >> size before returning (max.poll.messages) or I need a way to flush to
> the
> >> producer when it's about to close my producer. The latter offers the
> >> benefit of flushing data that may be the results of processing input
> data
> >> whose offsets were already committed asynchronously.
> >> Mike
> >>
> >> On Saturday, April 9, 2016 2:27 PM, Guozhang Wang <
> wangg...@gmail.com>
> >> wrote:
> >>
> >>
> >>  Mike,
> >>
> >> Not clear what do you mean by "buffering up the contents". Producer
> itself
> >> already did some buffering and batching when sending to Kafka. Did you
> >> actually "merge" multiple small messages into one large message before
> >> giving it to the producer in the app code? In either case, I am not sure
> >> how it will help the downstream consumer memory pressure issue?
> >>
> >> About bounding the consumer memory usage, we already have some thoughts
> >> about that issue and plan to add the memory bounding feature like the
> >> producer does in the near future (
> >> https://issues.apache.org/jira/browse/KAFKA-2045), so it won't be a
> >> problem
> >> for long. And for the "max.poll.messages" config and 0.10.0, just FYI we
> >> are shooting to have it released end of this month.
> >>
> >> Guozhang
> >>
> >>
> >> On Sat, Apr 9, 2016 at 5:59 AM, Michael D. Coon
>  >> >
> >> wrote:
> >>
> >> > Guozhang,
> >> >In my processor, I'm buffering up contents of the final messages in
> >> > order to make them larger. This is to optimize throughput and avoid
> tiny
> >> > messages from being injected downstream. So nothing is being pushed to
> >> the
> >> > producer until my configured thresholds are met in the buffering
> >> mechanism.
> >> > So as it stands, these messages are left dangling after the producer
> >> closes
> >> > and, even worse, if periodic commits are happening behind the scenes,
> >> the
> >> > data is lost on restart.
> >> >What we need is a way to notify the processors that everything is
> >> > "about" to close so that I can properly flush what I have in memory
> 

Re: KStream Close Processor

2016-04-09 Thread Guozhang Wang
Mike,

Not clear what do you mean by "buffering up the contents". Producer itself
already did some buffering and batching when sending to Kafka. Did you
actually "merge" multiple small messages into one large message before
giving it to the producer in the app code? In either case, I am not sure
how it will help the downstream consumer memory pressure issue?

About bounding the consumer memory usage, we already have some thoughts
about that issue and plan to add the memory bounding feature like the
producer does in the near future (
https://issues.apache.org/jira/browse/KAFKA-2045), so it won't be a problem
for long. And for the "max.poll.messages" config and 0.10.0, just FYI we
are shooting to have it released end of this month.

Guozhang


On Sat, Apr 9, 2016 at 5:59 AM, Michael D. Coon 
wrote:

> Guozhang,
>In my processor, I'm buffering up contents of the final messages in
> order to make them larger. This is to optimize throughput and avoid tiny
> messages from being injected downstream. So nothing is being pushed to the
> producer until my configured thresholds are met in the buffering mechanism.
> So as it stands, these messages are left dangling after the producer closes
> and, even worse, if periodic commits are happening behind the scenes, the
> data is lost on restart.
>What we need is a way to notify the processors that everything is
> "about" to close so that I can properly flush what I have in memory out to
> the producer. Otherwise, I'm stuck with always sending tiny messages into
> kafka--which I know for certain causes problems on down stream consumers
> (where they set a high fetch memory size and it causes hundreds of
> thousands of messages to be retrieved at a time…and thus bogs down the
> consumer). I think the "max.poll.messages" setting we discussed before
> would help here but if it's not available until 0.10, I'm kind of stuck.
> Another option might be to disable periodic commits and only commit
> when the processor requests it. This would mitigate some data loss and is
> better than nothing. There is still a chance that data in RecordQueue not
> yet sent to my processor would be committed but never processed in this
> case.
> Another thought I had was to reduce the max fetch size; however, some
> messages can be very large (i.e. data spikes periodically). In this case,
> the messages size would exceed my lower max fetch size causing the consumer
> to simply stop consuming. So I'm stuck. So either we need to roll in the
> max.poll.messages sooner than 0.10 or maybe a callback mechanism letting me
> know that the producer is about to close so I can clear my buffers.
> Ideas?
> Mike
>
> On Friday, April 8, 2016 8:24 PM, Guozhang Wang 
> wrote:
>
>
>  Hi Michael,
>
> When you call KafkaStreams.close(), it will first trigger a commitAll()
> function, which will 1) flush local state store if necessary; 2) flush
> messages buffered in producer; 3) commit offsets on consumer. Then it will
> close the producer / consumer clients and shutdown the tasks. So when you
> see processor's "close" function triggered, any buffered messages in the
> producer should already been flushed.
>
> Did you see a different behavior than the above described?
>
> Guozhang
>
>
> On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon  >
> wrote:
>
> > All,
> >I'm seeing my processor's "close" method being called AFTER my
> > downstream producer has been closed. I had assumed that on close I would
> be
> > able to flush whatever I had been buffering up to send to kafka topic. In
> > other words, we've seen significant performance differences in building
> > flows with small messages and large messages in/out of kafka. So my
> > processor buffers up messages to a threshold and flushes those as a
> > composite message bundle to improve downstream processing. But if this
> > close method is called AFTER the producer has already been closed, I
> would
> > have no way to actually flush the final composite bundles to my topic on
> > shutdown. Is there some way to get a call BEFORE producer shutdown
> occurs?
> > Mike
> >
> >
>
>
> --
> -- Guozhang
>
>
>
>



-- 
-- Guozhang


Re: KStream Close Processor

2016-04-09 Thread Michael D. Coon
Guozhang,
   In my processor, I'm buffering up contents of the final messages in order to 
make them larger. This is to optimize throughput and avoid tiny messages from 
being injected downstream. So nothing is being pushed to the producer until my 
configured thresholds are met in the buffering mechanism. So as it stands, 
these messages are left dangling after the producer closes and, even worse, if 
periodic commits are happening behind the scenes, the data is lost on restart.
   What we need is a way to notify the processors that everything is "about" to 
close so that I can properly flush what I have in memory out to the producer. 
Otherwise, I'm stuck with always sending tiny messages into kafka--which I know 
for certain causes problems on down stream consumers (where they set a high 
fetch memory size and it causes hundreds of thousands of messages to be 
retrieved at a time…and thus bogs down the consumer). I think the 
"max.poll.messages" setting we discussed before would help here but if it's not 
available until 0.10, I'm kind of stuck.
    Another option might be to disable periodic commits and only commit when 
the processor requests it. This would mitigate some data loss and is better 
than nothing. There is still a chance that data in RecordQueue not yet sent to 
my processor would be committed but never processed in this case.
    Another thought I had was to reduce the max fetch size; however, some 
messages can be very large (i.e. data spikes periodically). In this case, the 
messages size would exceed my lower max fetch size causing the consumer to 
simply stop consuming. So I'm stuck. So either we need to roll in the 
max.poll.messages sooner than 0.10 or maybe a callback mechanism letting me 
know that the producer is about to close so I can clear my buffers. 
    Ideas?
Mike

On Friday, April 8, 2016 8:24 PM, Guozhang Wang  wrote:
 

 Hi Michael,

When you call KafkaStreams.close(), it will first trigger a commitAll()
function, which will 1) flush local state store if necessary; 2) flush
messages buffered in producer; 3) commit offsets on consumer. Then it will
close the producer / consumer clients and shutdown the tasks. So when you
see processor's "close" function triggered, any buffered messages in the
producer should already been flushed.

Did you see a different behavior than the above described?

Guozhang


On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon 
wrote:

> All,
>    I'm seeing my processor's "close" method being called AFTER my
> downstream producer has been closed. I had assumed that on close I would be
> able to flush whatever I had been buffering up to send to kafka topic. In
> other words, we've seen significant performance differences in building
> flows with small messages and large messages in/out of kafka. So my
> processor buffers up messages to a threshold and flushes those as a
> composite message bundle to improve downstream processing. But if this
> close method is called AFTER the producer has already been closed, I would
> have no way to actually flush the final composite bundles to my topic on
> shutdown. Is there some way to get a call BEFORE producer shutdown occurs?
> Mike
>
>


-- 
-- Guozhang


  

Re: KStream Close Processor

2016-04-08 Thread Guozhang Wang
Hi Michael,

When you call KafkaStreams.close(), it will first trigger a commitAll()
function, which will 1) flush local state store if necessary; 2) flush
messages buffered in producer; 3) commit offsets on consumer. Then it will
close the producer / consumer clients and shutdown the tasks. So when you
see processor's "close" function triggered, any buffered messages in the
producer should already been flushed.

Did you see a different behavior than the above described?

Guozhang


On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon 
wrote:

> All,
>I'm seeing my processor's "close" method being called AFTER my
> downstream producer has been closed. I had assumed that on close I would be
> able to flush whatever I had been buffering up to send to kafka topic. In
> other words, we've seen significant performance differences in building
> flows with small messages and large messages in/out of kafka. So my
> processor buffers up messages to a threshold and flushes those as a
> composite message bundle to improve downstream processing. But if this
> close method is called AFTER the producer has already been closed, I would
> have no way to actually flush the final composite bundles to my topic on
> shutdown. Is there some way to get a call BEFORE producer shutdown occurs?
> Mike
>
>


-- 
-- Guozhang


KStream Close Processor

2016-04-08 Thread Michael D. Coon
All,
   I'm seeing my processor's "close" method being called AFTER my downstream 
producer has been closed. I had assumed that on close I would be able to flush 
whatever I had been buffering up to send to kafka topic. In other words, we've 
seen significant performance differences in building flows with small messages 
and large messages in/out of kafka. So my processor buffers up messages to a 
threshold and flushes those as a composite message bundle to improve downstream 
processing. But if this close method is called AFTER the producer has already 
been closed, I would have no way to actually flush the final composite bundles 
to my topic on shutdown. Is there some way to get a call BEFORE producer 
shutdown occurs?
Mike