One way that I can think of, is to add an index suffix on the key to differentiate records with the same keys, so your can still store records not as a list but as separate entries on KV store like:
<k-1, v> <k-2, v> ... And then when punctuating, you can still scan the whole store or do a range query based on the key prefix to apply your computational logic. Guozhang On Fri, Sep 23, 2016 at 9:23 AM, Srikanth <srikanth...@gmail.com> wrote: > Guozhang, > > The example works well for aggregate operations. How can we achieve this if > processing has to be in Micro-batching? > One way will be to store the incoming records in a List type KV store and > process it in punctuate. > With the current KV stores, that would mean (de)serializing a list. Which > is not very efficient. Or may be there is a way around it? > A simple search on RocksDB shows there is a merge operator. That can be of > use here?? > > Srikanth > > On Sun, Sep 11, 2016 at 11:19 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > Hello Ara, > > > > On the processor API, users have the flexible to do micro-batching with > > their own implementation patterns. For example, like you mentioned > already: > > > > 1. Use a state store to bookkeep recently received records, and in > > process() function simply put the record into the store. > > 2. Use puncutate() function to periodically process the bookkept batch > > store in the state by iterating over the state, and send results to the > > downstream. > > > > You can find a simple example in WordCount demo: > > > > https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe734313 > > 07c1aef5a1/streams/examples/src/main/java/org/apache/ > > kafka/streams/examples/wordcount/WordCountProcessorDemo.java > > > > Note that it does not bookkeep the original records as micro-batches, but > > compute the running aggregate results. But the general coding pattern is > > the same. > > > > On the higher-level streams DSL, there is a proposed KIP for using > caching > > for aggregate operators, as a manner for implicit "trigger" mechanism. > This > > is not exactly the same as micro-batching, but also acts as reducing IO > > costs as well as data traffic: > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 63%3A+Unify+store+and+downstream+caching+in+streams > > > > > > Let me know if these references are helpful to you. > > > > Guozhang > > > > > > > > > > > > > > On Mon, Sep 5, 2016 at 12:10 PM, Ara Ebrahimi < > ara.ebrah...@argyledata.com > > > > > wrote: > > > > > Hi, > > > > > > What’s the best way to do micro-batching in Kafka Streams? Any plans > for > > a > > > built-in mechanism? Perhaps StateStore could act as the buffer? What > > > exactly are ProcessorContext.schedule()/punctuate() for? They don’t > seem > > > to be used anywhere? > > > > > > http://hortonworks.com/blog/apache-storm-design-pattern- > micro-batching/ > > > > > > Ara. > > > > > > > > > > > > ________________________________ > > > > > > This message is for the designated recipient only and may contain > > > privileged, proprietary, or otherwise confidential information. If you > > have > > > received it in error, please notify the sender immediately and delete > the > > > original. Any other use of the e-mail by you is prohibited. Thank you > in > > > advance for your cooperation. > > > > > > ________________________________ > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang