Guozhang, The example works well for aggregate operations. How can we achieve this if processing has to be in Micro-batching? One way will be to store the incoming records in a List type KV store and process it in punctuate. With the current KV stores, that would mean (de)serializing a list. Which is not very efficient. Or may be there is a way around it? A simple search on RocksDB shows there is a merge operator. That can be of use here??
Srikanth On Sun, Sep 11, 2016 at 11:19 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Ara, > > On the processor API, users have the flexible to do micro-batching with > their own implementation patterns. For example, like you mentioned already: > > 1. Use a state store to bookkeep recently received records, and in > process() function simply put the record into the store. > 2. Use puncutate() function to periodically process the bookkept batch > store in the state by iterating over the state, and send results to the > downstream. > > You can find a simple example in WordCount demo: > > https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe734313 > 07c1aef5a1/streams/examples/src/main/java/org/apache/ > kafka/streams/examples/wordcount/WordCountProcessorDemo.java > > Note that it does not bookkeep the original records as micro-batches, but > compute the running aggregate results. But the general coding pattern is > the same. > > On the higher-level streams DSL, there is a proposed KIP for using caching > for aggregate operators, as a manner for implicit "trigger" mechanism. This > is not exactly the same as micro-batching, but also acts as reducing IO > costs as well as data traffic: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 63%3A+Unify+store+and+downstream+caching+in+streams > > > Let me know if these references are helpful to you. > > Guozhang > > > > > > > On Mon, Sep 5, 2016 at 12:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com > > > wrote: > > > Hi, > > > > What’s the best way to do micro-batching in Kafka Streams? Any plans for > a > > built-in mechanism? Perhaps StateStore could act as the buffer? What > > exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem > > to be used anywhere? > > > > http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/ > > > > Ara. > > > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > > privileged, proprietary, or otherwise confidential information. If you > have > > received it in error, please notify the sender immediately and delete the > > original. Any other use of the e-mail by you is prohibited. Thank you in > > advance for your cooperation. > > > > ________________________________ > > > > > > -- > -- Guozhang >