Re: Micro-batching in Kafka streams - redux

2017-10-20 Thread Matthias J. Sax
Hi,

as Kafka Streams focuses on stream processing, micro-batching is
something we don't consider. Thus, nothing has changed/improved.

About the store question:

If you buffer up your writes in a store, you need to delete those value
from the store later on to avoid that the store grown unbounded. If you
do this, Kafka Streams will also write corresponding tombstones to the
changelog topic and thus, compaction will just work fine.

General comment: a strict alignment of offset commits and your batch
writes to an external store is not possible, as there is no
callback/notification when an offset commit happens. However, you also
don't need to to build a correct solution. You can just define your
external write interval as whatever value you want it.

Hope this helps.


-Matthias

On 10/20/17 12:36 PM, Shrijeet Paliwal wrote:
> Kafka version: 0.10.2.1 (can upgrade if needed)
> 
> I wish to revive the discussion around micro batching in Kafka streams.
> Some past discussions are here <http://markmail.org/thread/zdxkvwt6ppq2xhv2>
> & here <http://markmail.org/thread/un7dmn7pyk7eibxz>.
> 
> I am exploring ways to do at-least-once processing of events which are
> handled in small batches as opposed to one at a time. A specific example is
> to buffer mutation ops to a non-kafka sink and align the flushing of
> batched ops with the offset commits.
> 
> The suggestions and workarounds that I have noticed in mailing lists are:
> 
> *a] Don't do it in Kafka streams, use Kafka connect. *
> 
> For the sake of this discussion, let's assume using kafka-connect isn't an
> option.
> 
> *b] In Kafka streams, use a key value state store to micro batch and
> perform a flush in punctuate method.*
> 
> The overhead seems nontrivial in this approach since a persistent key-value
> store is backed by a topic which is compacted, the keys in the state store
> will not be compaction friendly. For instance, if you use timestamp  & some
> unique id combination as key and perform range scan to find ops buffered
> since the last call to punctuate, the state store & backing Kafka topic
> will grow unbounded. Any retention applied to state store or topic would
> mean leaking implementation details, which makes this approach inelegant.
> 
> My question is since the last time this usecase was mentioned, has a better
> pattern emerged?
> 
> --
> Shrijeet
> 



signature.asc
Description: OpenPGP digital signature


Micro-batching in Kafka streams - redux

2017-10-20 Thread Shrijeet Paliwal
Kafka version: 0.10.2.1 (can upgrade if needed)

I wish to revive the discussion around micro batching in Kafka streams.
Some past discussions are here <http://markmail.org/thread/zdxkvwt6ppq2xhv2>
& here <http://markmail.org/thread/un7dmn7pyk7eibxz>.

I am exploring ways to do at-least-once processing of events which are
handled in small batches as opposed to one at a time. A specific example is
to buffer mutation ops to a non-kafka sink and align the flushing of
batched ops with the offset commits.

The suggestions and workarounds that I have noticed in mailing lists are:

*a] Don't do it in Kafka streams, use Kafka connect. *

For the sake of this discussion, let's assume using kafka-connect isn't an
option.

*b] In Kafka streams, use a key value state store to micro batch and
perform a flush in punctuate method.*

The overhead seems nontrivial in this approach since a persistent key-value
store is backed by a topic which is compacted, the keys in the state store
will not be compaction friendly. For instance, if you use timestamp  & some
unique id combination as key and perform range scan to find ops buffered
since the last call to punctuate, the state store & backing Kafka topic
will grow unbounded. Any retention applied to state store or topic would
mean leaking implementation details, which makes this approach inelegant.

My question is since the last time this usecase was mentioned, has a better
pattern emerged?

--
Shrijeet


Re: micro-batching in kafka streams

2016-09-28 Thread Ara Ebrahimi
his trigger is in the event processing time whereas the data
is in the event source time most probably.

Something like that :)

Ara.

On Sep 26, 2016, at 1:59 AM, Michael Noll 
<mich...@confluent.io<mailto:mich...@confluent.io>mailto:ich...@confluent.io>>mailto:ich...@confluent.io><mailto:ich...@confluent.io>>> 
wrote:

Ara,

may I ask why you need to use micro-batching in the first place?

Reason why I am asking: Typically, when people talk about micro-batching,
they are refer to the way some originally batch-based stream processing
tools "bolt on" real-time processing by making their batch sizes really
small.  Here, micro-batching belongs to the realm of the inner workings of
the stream processing tool.

Orthogonally to that, you have features/operations such as windowing,
triggers, etc. that -- unlike micro-batching -- allow you as the user of
the stream processing tool to define which exact computation logic you
need.  Whether or not, say, windowing is or is not computed via
micro-batching behind the scenes should (at least in an ideal world) be of
no concern to the user.

-Michael





On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com><
mailto:ara.ebrah...@argyledata.com><
mailto:ara.ebrah...@argyledata.com>>
wrote:

Hi,

What’s the best way to do micro-batching in Kafka Streams? Any plans for a
built-in mechanism? Perhaps StateStore could act as the buffer? What
exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
to be used anywhere?

http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/

Ara.





This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.








This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.









This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.






--
-- Guozhang





This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.








This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.






--
-- Guozhang





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.








This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: micro-batching in kafka streams

2016-09-28 Thread Guozhang Wang
 { /* write */ } );
>
> The tricky part is reconciling event source time and event processing
> time. Clearly this trigger is in the event processing time whereas the data
> is in the event source time most probably.
>
> Something like that :)
>
> Ara.
>
> On Sep 26, 2016, at 1:59 AM, Michael Noll <mich...@confluent.io ich...@confluent.io> ich...@confluent.io<mailto:ich...@confluent.io>>> wrote:
>
> Ara,
>
> may I ask why you need to use micro-batching in the first place?
>
> Reason why I am asking: Typically, when people talk about micro-batching,
> they are refer to the way some originally batch-based stream processing
> tools "bolt on" real-time processing by making their batch sizes really
> small.  Here, micro-batching belongs to the realm of the inner workings of
> the stream processing tool.
>
> Orthogonally to that, you have features/operations such as windowing,
> triggers, etc. that -- unlike micro-batching -- allow you as the user of
> the stream processing tool to define which exact computation logic you
> need.  Whether or not, say, windowing is or is not computed via
> micro-batching behind the scenes should (at least in an ideal world) be of
> no concern to the user.
>
> -Michael
>
>
>
>
>
> On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com<
> mailto:ara.ebrah...@argyledata.com><
> mailto:ara.ebrah...@argyledata.com>>
> wrote:
>
> Hi,
>
> What’s the best way to do micro-batching in Kafka Streams? Any plans for a
> built-in mechanism? Perhaps StateStore could act as the buffer? What
> exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
> to be used anywhere?
>
> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/
>
> Ara.
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>
>
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>
>
>
>
> --
> -- Guozhang
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>



-- 
-- Guozhang


Re: micro-batching in kafka streams

2016-09-28 Thread Ara Ebrahimi
eatures/operations such as windowing,
triggers, etc. that -- unlike micro-batching -- allow you as the user of
the stream processing tool to define which exact computation logic you
need.  Whether or not, say, windowing is or is not computed via
micro-batching behind the scenes should (at least in an ideal world) be of
no concern to the user.

-Michael





On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com><
mailto:ara.ebrah...@argyledata.com>>
wrote:

Hi,

What’s the best way to do micro-batching in Kafka Streams? Any plans for a
built-in mechanism? Perhaps StateStore could act as the buffer? What
exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
to be used anywhere?

http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/

Ara.





This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.








This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.









This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.






--
-- Guozhang





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.








This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: micro-batching in kafka streams

2016-09-28 Thread Guozhang Wang
Ara,

Are you using the interactive queries feature but encountered issue due to
locking file conflicts?

https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams

This is not expected to happen, if you are indeed using this feature I'd
like to learn more of your error scenario.

Guozhang


On Tue, Sep 27, 2016 at 9:41 AM, Ara Ebrahimi <ara.ebrah...@argyledata.com>
wrote:

> One more thing:
>
> Guozhang pointed me towards this sample for micro-batching:
> https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe734313
> 07c1aef5a1/streams/examples/src/main/java/org/apache/
> kafka/streams/examples/wordcount/WordCountProcessorDemo.java
>
> This is a good example and successfully got it adapted for my user case.
> BUT the main problem is that even if my use case deals with writing of
> hourly windows of data and hence the data is already in a rocksdb file but
> I need to create a duplicate of the same file just to be able to
> periodically do range scans on it and write to the external database. I did
> try to see if I could get StateStore to read the same rocksdb file used by
> the aggregateByKey which is happening before this step but it complained
> about not being able to lock the file. Would be great to be able to share
> the same underlying file between aggregateByKey (or any other such
> KTable-producing operation) and such periodic triggers.
>
> Ara.
>
> On Sep 26, 2016, at 10:40 AM, Ara Ebrahimi <ara.ebrah...@argyledata.com<
> mailto:ara.ebrah...@argyledata.com>> wrote:
>
> Hi,
>
> So, here’s the situation:
>
> - for classic batching of writes to external systems, right now I simply
> hack it. This specific case is writing of records to Accmumlo database, and
> I simply use the batch writer to batch writes, and it flushes every second
> or so. I’ve added a shutdown hook to the jvm to flush upon graceful exit
> too. This is good enough for me, but obviously it’s not perfect. I wish
> Kafka Streams had some sort of a trigger (based on x number of records
> processed, or y window of time passed). Which brings me to the next use
> case.
>
> - I have some logic for calculating hourly statistics. So I’m dealing with
> Windowed data already. These stats then need to be written to an external
> database for use by user facing systems. Obviously I need to write the
> final result for each hourly window after we’re past that window of time
> (or I can write as often as it gets updated but the problem is that the
> external database is not as fast as Kafka). I do understand that I need to
> take into account the fact that events may arrive out of order and there
> may be some records arriving a little bit after I’ve considered the
> previous window over and have moved to the next one. I’d like to have some
> sort of an hourly trigger (not just pure x milliseconds trigger, but also
> support for cron style timing) and then also have the option to update the
> stats I’ve already written for a window a set amount of time after the
> trigger got triggered so that I can deal with events which arrive after the
> write for that window. And then there’s a cut-off point after which
> updating the stats for a very old window is just not worth it. Something
> like this DSL:
>
> kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update
> every hour afterwards */ Hours.toMillis(1), /* discard changes older than
> this */ Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey,
> record) -> { /* write */ } );
>
> The tricky part is reconciling event source time and event processing
> time. Clearly this trigger is in the event processing time whereas the data
> is in the event source time most probably.
>
> Something like that :)
>
> Ara.
>
> On Sep 26, 2016, at 1:59 AM, Michael Noll <mich...@confluent.io ich...@confluent.io>> wrote:
>
> Ara,
>
> may I ask why you need to use micro-batching in the first place?
>
> Reason why I am asking: Typically, when people talk about micro-batching,
> they are refer to the way some originally batch-based stream processing
> tools "bolt on" real-time processing by making their batch sizes really
> small.  Here, micro-batching belongs to the realm of the inner workings of
> the stream processing tool.
>
> Orthogonally to that, you have features/operations such as windowing,
> triggers, etc. that -- unlike micro-batching -- allow you as the user of
> the stream processing tool to define which exact computation logic you
> need.  Whether or not, say, windowing is or is not computed via
> micro-batching behind the scenes should (at least in an ideal world) be of
> no concern to the user.
>
> -Michael
>
>
>
>
>
> On M

Re: micro-batching in kafka streams

2016-09-27 Thread Ara Ebrahimi
One more thing:

Guozhang pointed me towards this sample for micro-batching: 
https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe73431307c1aef5a1/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java

This is a good example and successfully got it adapted for my user case. BUT 
the main problem is that even if my use case deals with writing of hourly 
windows of data and hence the data is already in a rocksdb file but I need to 
create a duplicate of the same file just to be able to periodically do range 
scans on it and write to the external database. I did try to see if I could get 
StateStore to read the same rocksdb file used by the aggregateByKey which is 
happening before this step but it complained about not being able to lock the 
file. Would be great to be able to share the same underlying file between 
aggregateByKey (or any other such KTable-producing operation) and such periodic 
triggers.

Ara.

On Sep 26, 2016, at 10:40 AM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>> wrote:

Hi,

So, here’s the situation:

- for classic batching of writes to external systems, right now I simply hack 
it. This specific case is writing of records to Accmumlo database, and I simply 
use the batch writer to batch writes, and it flushes every second or so. I’ve 
added a shutdown hook to the jvm to flush upon graceful exit too. This is good 
enough for me, but obviously it’s not perfect. I wish Kafka Streams had some 
sort of a trigger (based on x number of records processed, or y window of time 
passed). Which brings me to the next use case.

- I have some logic for calculating hourly statistics. So I’m dealing with 
Windowed data already. These stats then need to be written to an external 
database for use by user facing systems. Obviously I need to write the final 
result for each hourly window after we’re past that window of time (or I can 
write as often as it gets updated but the problem is that the external database 
is not as fast as Kafka). I do understand that I need to take into account the 
fact that events may arrive out of order and there may be some records arriving 
a little bit after I’ve considered the previous window over and have moved to 
the next one. I’d like to have some sort of an hourly trigger (not just pure x 
milliseconds trigger, but also support for cron style timing) and then also 
have the option to update the stats I’ve already written for a window a set 
amount of time after the trigger got triggered so that I can deal with events 
which arrive after the write for that window. And then there’s a cut-off point 
after which updating the stats for a very old window is just not worth it. 
Something like this DSL:

kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update every 
hour afterwards */ Hours.toMillis(1), /* discard changes older than this */ 
Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey, record) -> { /* 
write */ } );

The tricky part is reconciling event source time and event processing time. 
Clearly this trigger is in the event processing time whereas the data is in the 
event source time most probably.

Something like that :)

Ara.

On Sep 26, 2016, at 1:59 AM, Michael Noll 
<mich...@confluent.io<mailto:mich...@confluent.io>> wrote:

Ara,

may I ask why you need to use micro-batching in the first place?

Reason why I am asking: Typically, when people talk about micro-batching,
they are refer to the way some originally batch-based stream processing
tools "bolt on" real-time processing by making their batch sizes really
small.  Here, micro-batching belongs to the realm of the inner workings of
the stream processing tool.

Orthogonally to that, you have features/operations such as windowing,
triggers, etc. that -- unlike micro-batching -- allow you as the user of
the stream processing tool to define which exact computation logic you
need.  Whether or not, say, windowing is or is not computed via
micro-batching behind the scenes should (at least in an ideal world) be of
no concern to the user.

-Michael





On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>>
wrote:

Hi,

What’s the best way to do micro-batching in Kafka Streams? Any plans for a
built-in mechanism? Perhaps StateStore could act as the buffer? What
exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
to be used anywhere?

http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/

Ara.





This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is proh

Re: micro-batching in kafka streams

2016-09-26 Thread Ara Ebrahimi
Hi,

So, here’s the situation:

- for classic batching of writes to external systems, right now I simply hack 
it. This specific case is writing of records to Accmumlo database, and I simply 
use the batch writer to batch writes, and it flushes every second or so. I’ve 
added a shutdown hook to the jvm to flush upon graceful exit too. This is good 
enough for me, but obviously it’s not perfect. I wish Kafka Streams had some 
sort of a trigger (based on x number of records processed, or y window of time 
passed). Which brings me to the next use case.

- I have some logic for calculating hourly statistics. So I’m dealing with 
Windowed data already. These stats then need to be written to an external 
database for use by user facing systems. Obviously I need to write the final 
result for each hourly window after we’re past that window of time (or I can 
write as often as it gets updated but the problem is that the external database 
is not as fast as Kafka). I do understand that I need to take into account the 
fact that events may arrive out of order and there may be some records arriving 
a little bit after I’ve considered the previous window over and have moved to 
the next one. I’d like to have some sort of an hourly trigger (not just pure x 
milliseconds trigger, but also support for cron style timing) and then also 
have the option to update the stats I’ve already written for a window a set 
amount of time after the trigger got triggered so that I can deal with events 
which arrive after the write for that window. And then there’s a cut-off point 
after which updating the stats for a very old window is just not worth it. 
Something like this DSL:

kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update every 
hour afterwards */ Hours.toMillis(1), /* discard changes older than this */ 
Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey, record) -> { /* 
write */ } );

The tricky part is reconciling event source time and event processing time. 
Clearly this trigger is in the event processing time whereas the data is in the 
event source time most probably.

Something like that :)

Ara.

> On Sep 26, 2016, at 1:59 AM, Michael Noll <mich...@confluent.io> wrote:
>
> Ara,
>
> may I ask why you need to use micro-batching in the first place?
>
> Reason why I am asking: Typically, when people talk about micro-batching,
> they are refer to the way some originally batch-based stream processing
> tools "bolt on" real-time processing by making their batch sizes really
> small.  Here, micro-batching belongs to the realm of the inner workings of
> the stream processing tool.
>
> Orthogonally to that, you have features/operations such as windowing,
> triggers, etc. that -- unlike micro-batching -- allow you as the user of
> the stream processing tool to define which exact computation logic you
> need.  Whether or not, say, windowing is or is not computed via
> micro-batching behind the scenes should (at least in an ideal world) be of
> no concern to the user.
>
> -Michael
>
>
>
>
>
> On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com>
> wrote:
>
>> Hi,
>>
>> What’s the best way to do micro-batching in Kafka Streams? Any plans for a
>> built-in mechanism? Perhaps StateStore could act as the buffer? What
>> exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
>> to be used anywhere?
>>
>> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/
>>
>> Ara.
>>
>>
>>
>> 
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you have
>> received it in error, please notify the sender immediately and delete the
>> original. Any other use of the e-mail by you is prohibited. Thank you in
>> advance for your cooperation.
>>
>> 
>>
>
>
>
> 
>
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
>
> 






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: micro-batching in kafka streams

2016-09-26 Thread Srikanth
Guozhang,
Its a bit hacky but I guess it will work fine as range scan isn't expensive
in RocksDB.

Michael,
One reason is to be able to batch before sinking to an external system.
Sink call per record isn't very efficient.
This can be used just for the sink processor.

I feel I might be stealing this thread from Ara :-). Lets wait and hear his
reasons.

Srikanth

On Mon, Sep 26, 2016 at 4:59 AM, Michael Noll <mich...@confluent.io> wrote:

> Ara,
>
> may I ask why you need to use micro-batching in the first place?
>
> Reason why I am asking: Typically, when people talk about micro-batching,
> they are refer to the way some originally batch-based stream processing
> tools "bolt on" real-time processing by making their batch sizes really
> small.  Here, micro-batching belongs to the realm of the inner workings of
> the stream processing tool.
>
> Orthogonally to that, you have features/operations such as windowing,
> triggers, etc. that -- unlike micro-batching -- allow you as the user of
> the stream processing tool to define which exact computation logic you
> need.  Whether or not, say, windowing is or is not computed via
> micro-batching behind the scenes should (at least in an ideal world) be of
> no concern to the user.
>
> -Michael
>
>
>
>
>
> On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com>
> wrote:
>
> > Hi,
> >
> > What’s the best way to do micro-batching in Kafka Streams? Any plans for
> a
> > built-in mechanism? Perhaps StateStore could act as the buffer? What
> > exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
> > to be used anywhere?
> >
> > http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/
> >
> > Ara.
> >
> >
> >
> > 
> >
> > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> >
> > 
> >
>


Re: micro-batching in kafka streams

2016-09-26 Thread Michael Noll
Ara,

may I ask why you need to use micro-batching in the first place?

Reason why I am asking: Typically, when people talk about micro-batching,
they are refer to the way some originally batch-based stream processing
tools "bolt on" real-time processing by making their batch sizes really
small.  Here, micro-batching belongs to the realm of the inner workings of
the stream processing tool.

Orthogonally to that, you have features/operations such as windowing,
triggers, etc. that -- unlike micro-batching -- allow you as the user of
the stream processing tool to define which exact computation logic you
need.  Whether or not, say, windowing is or is not computed via
micro-batching behind the scenes should (at least in an ideal world) be of
no concern to the user.

-Michael





On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com>
wrote:

> Hi,
>
> What’s the best way to do micro-batching in Kafka Streams? Any plans for a
> built-in mechanism? Perhaps StateStore could act as the buffer? What
> exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
> to be used anywhere?
>
> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/
>
> Ara.
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>


Re: micro-batching in kafka streams

2016-09-23 Thread Guozhang Wang
One way that I can think of, is to add an index suffix on the key to
differentiate records with the same keys, so your can still store records
not as a list but as separate entries on KV store like:

<k-1, v>
<k-2, v>

...

And then when punctuating, you can still scan the whole store or do a range
query based on the key prefix to apply your computational logic.


Guozhang


On Fri, Sep 23, 2016 at 9:23 AM, Srikanth <srikanth...@gmail.com> wrote:

> Guozhang,
>
> The example works well for aggregate operations. How can we achieve this if
> processing has to be in Micro-batching?
> One way will be to store the incoming records in a List type KV store and
> process it in punctuate.
> With the current KV stores, that would mean (de)serializing a list. Which
> is not very efficient. Or may be there is a way around it?
> A simple search on RocksDB shows there is a merge operator. That can be of
> use here??
>
> Srikanth
>
> On Sun, Sep 11, 2016 at 11:19 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Hello Ara,
> >
> > On the processor API, users have the flexible to do micro-batching with
> > their own implementation patterns. For example, like you mentioned
> already:
> >
> > 1. Use a state store to bookkeep recently received records, and in
> > process() function simply put the record into the store.
> > 2. Use puncutate() function to periodically process the bookkept batch
> > store in the state by iterating over the state, and send results to the
> > downstream.
> >
> > You can find a simple example in WordCount demo:
> >
> > https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe734313
> > 07c1aef5a1/streams/examples/src/main/java/org/apache/
> > kafka/streams/examples/wordcount/WordCountProcessorDemo.java
> >
> > Note that it does not bookkeep the original records as micro-batches, but
> > compute the running aggregate results. But the general coding pattern is
> > the same.
> >
> > On the higher-level streams DSL, there is a proposed KIP for using
> caching
> > for aggregate operators, as a manner for implicit "trigger" mechanism.
> This
> > is not exactly the same as micro-batching, but also acts as reducing IO
> > costs as well as data traffic:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 63%3A+Unify+store+and+downstream+caching+in+streams
> >
> >
> > Let me know if these references are helpful to you.
> >
> > Guozhang
> >
> >
> >
> >
> >
> >
> > On Mon, Sep 5, 2016 at 12:10 PM, Ara Ebrahimi <
> ara.ebrah...@argyledata.com
> > >
> > wrote:
> >
> > > Hi,
> > >
> > > What’s the best way to do micro-batching in Kafka Streams? Any plans
> for
> > a
> > > built-in mechanism? Perhaps StateStore could act as the buffer? What
> > > exactly are ProcessorContext.schedule()/punctuate() for? They don’t
> seem
> > > to be used anywhere?
> > >
> > > http://hortonworks.com/blog/apache-storm-design-pattern-
> micro-batching/
> > >
> > > Ara.
> > >
> > >
> > >
> > > 
> > >
> > > This message is for the designated recipient only and may contain
> > > privileged, proprietary, or otherwise confidential information. If you
> > have
> > > received it in error, please notify the sender immediately and delete
> the
> > > original. Any other use of the e-mail by you is prohibited. Thank you
> in
> > > advance for your cooperation.
> > >
> > > 
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: micro-batching in kafka streams

2016-09-23 Thread Srikanth
Guozhang,

The example works well for aggregate operations. How can we achieve this if
processing has to be in Micro-batching?
One way will be to store the incoming records in a List type KV store and
process it in punctuate.
With the current KV stores, that would mean (de)serializing a list. Which
is not very efficient. Or may be there is a way around it?
A simple search on RocksDB shows there is a merge operator. That can be of
use here??

Srikanth

On Sun, Sep 11, 2016 at 11:19 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Ara,
>
> On the processor API, users have the flexible to do micro-batching with
> their own implementation patterns. For example, like you mentioned already:
>
> 1. Use a state store to bookkeep recently received records, and in
> process() function simply put the record into the store.
> 2. Use puncutate() function to periodically process the bookkept batch
> store in the state by iterating over the state, and send results to the
> downstream.
>
> You can find a simple example in WordCount demo:
>
> https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe734313
> 07c1aef5a1/streams/examples/src/main/java/org/apache/
> kafka/streams/examples/wordcount/WordCountProcessorDemo.java
>
> Note that it does not bookkeep the original records as micro-batches, but
> compute the running aggregate results. But the general coding pattern is
> the same.
>
> On the higher-level streams DSL, there is a proposed KIP for using caching
> for aggregate operators, as a manner for implicit "trigger" mechanism. This
> is not exactly the same as micro-batching, but also acts as reducing IO
> costs as well as data traffic:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 63%3A+Unify+store+and+downstream+caching+in+streams
>
>
> Let me know if these references are helpful to you.
>
> Guozhang
>
>
>
>
>
>
> On Mon, Sep 5, 2016 at 12:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com
> >
> wrote:
>
> > Hi,
> >
> > What’s the best way to do micro-batching in Kafka Streams? Any plans for
> a
> > built-in mechanism? Perhaps StateStore could act as the buffer? What
> > exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
> > to be used anywhere?
> >
> > http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/
> >
> > Ara.
> >
> >
> >
> > 
> >
> > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> >
> > 
> >
>
>
>
> --
> -- Guozhang
>


Re: micro-batching in kafka streams

2016-09-12 Thread Ara Ebrahimi
Thanks.

+1 on KIP-63 story. I need all of that :)

Ara.

> On Sep 11, 2016, at 8:19 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> Hello Ara,
>
> On the processor API, users have the flexible to do micro-batching with
> their own implementation patterns. For example, like you mentioned already:
>
> 1. Use a state store to bookkeep recently received records, and in
> process() function simply put the record into the store.
> 2. Use puncutate() function to periodically process the bookkept batch
> store in the state by iterating over the state, and send results to the
> downstream.
>
> You can find a simple example in WordCount demo:
>
> https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe73431307c1aef5a1/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
>
> Note that it does not bookkeep the original records as micro-batches, but
> compute the running aggregate results. But the general coding pattern is
> the same.
>
> On the higher-level streams DSL, there is a proposed KIP for using caching
> for aggregate operators, as a manner for implicit "trigger" mechanism. This
> is not exactly the same as micro-batching, but also acts as reducing IO
> costs as well as data traffic:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams
>
>
> Let me know if these references are helpful to you.
>
> Guozhang
>
>
>
>
>
>
> On Mon, Sep 5, 2016 at 12:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com>
> wrote:
>
>> Hi,
>>
>> What’s the best way to do micro-batching in Kafka Streams? Any plans for a
>> built-in mechanism? Perhaps StateStore could act as the buffer? What
>> exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
>> to be used anywhere?
>>
>> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/
>>
>> Ara.
>>
>>
>>
>> 
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you have
>> received it in error, please notify the sender immediately and delete the
>> original. Any other use of the e-mail by you is prohibited. Thank you in
>> advance for your cooperation.
>>
>> 
>>
>
>
>
> --
> -- Guozhang
>
>
>
> 
>
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
>
> 






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: micro-batching in kafka streams

2016-09-11 Thread Guozhang Wang
Hello Ara,

On the processor API, users have the flexible to do micro-batching with
their own implementation patterns. For example, like you mentioned already:

1. Use a state store to bookkeep recently received records, and in
process() function simply put the record into the store.
2. Use puncutate() function to periodically process the bookkept batch
store in the state by iterating over the state, and send results to the
downstream.

You can find a simple example in WordCount demo:

https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe73431307c1aef5a1/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java

Note that it does not bookkeep the original records as micro-batches, but
compute the running aggregate results. But the general coding pattern is
the same.

On the higher-level streams DSL, there is a proposed KIP for using caching
for aggregate operators, as a manner for implicit "trigger" mechanism. This
is not exactly the same as micro-batching, but also acts as reducing IO
costs as well as data traffic:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams


Let me know if these references are helpful to you.

Guozhang






On Mon, Sep 5, 2016 at 12:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com>
wrote:

> Hi,
>
> What’s the best way to do micro-batching in Kafka Streams? Any plans for a
> built-in mechanism? Perhaps StateStore could act as the buffer? What
> exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
> to be used anywhere?
>
> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/
>
> Ara.
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>



-- 
-- Guozhang


micro-batching in kafka streams

2016-09-05 Thread Ara Ebrahimi
Hi,

What’s the best way to do micro-batching in Kafka Streams? Any plans for a 
built-in mechanism? Perhaps StateStore could act as the buffer? What exactly 
are ProcessorContext.schedule()/punctuate() for? They don’t seem to be used 
anywhere?

http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/

Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.