Ara, I'd recommend you using the interactive queries feature, available in the up coming 0.10.1 in a couple of weeks, to query the current snapshot of the state store.
We are going to write a blog post about step-by-step instructions to leverage this feature for use cases just like yours soon. Guozhang On Wed, Sep 28, 2016 at 2:19 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com> wrote: > I need this ReadOnlyKeyValueStore. > > In my use case, I do an aggregateByKey(), so a KTable is formed, backed by > a state store. This is then used by the next steps of the pipeline. Now > using the word count sample, I try to read the state store. Hence I end up > sharing it with the actual pipeline. And Kafka Streams doesn’t like that > because apparently state stores are assumed to be writable (indeed there’s > a put() in there) and deep in the rocksdb state store code it tries to > create a lock file (and indeed there is a filed named LOCK in there). > > Ara. > > On Sep 28, 2016, at 2:03 PM, Guozhang Wang <wangg...@gmail.com<mailto:wan > gg...@gmail.com>> wrote: > > Ara, > > Are you using the interactive queries feature but encountered issue due to > locking file conflicts? > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 67%3A+Queryable+state+for+Kafka+Streams > > This is not expected to happen, if you are indeed using this feature I'd > like to learn more of your error scenario. > > Guozhang > > > On Tue, Sep 27, 2016 at 9:41 AM, Ara Ebrahimi <ara.ebrah...@argyledata.com > <mailto:ara.ebrah...@argyledata.com>> > wrote: > > One more thing: > > Guozhang pointed me towards this sample for micro-batching: > https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe734313 > 07c1aef5a1/streams/examples/src/main/java/org/apache/ > kafka/streams/examples/wordcount/WordCountProcessorDemo.java > > This is a good example and successfully got it adapted for my user case. > BUT the main problem is that even if my use case deals with writing of > hourly windows of data and hence the data is already in a rocksdb file but > I need to create a duplicate of the same file just to be able to > periodically do range scans on it and write to the external database. I did > try to see if I could get StateStore to read the same rocksdb file used by > the aggregateByKey which is happening before this step but it complained > about not being able to lock the file. Would be great to be able to share > the same underlying file between aggregateByKey (or any other such > KTable-producing operation) and such periodic triggers. > > Ara. > > On Sep 26, 2016, at 10:40 AM, Ara Ebrahimi <ara.ebrah...@argyledata.com< > mailto:ara.ebrah...@argyledata.com>< > mailto:ara.ebrah...@argyledata.com>> wrote: > > Hi, > > So, here’s the situation: > > - for classic batching of writes to external systems, right now I simply > hack it. This specific case is writing of records to Accmumlo database, and > I simply use the batch writer to batch writes, and it flushes every second > or so. I’ve added a shutdown hook to the jvm to flush upon graceful exit > too. This is good enough for me, but obviously it’s not perfect. I wish > Kafka Streams had some sort of a trigger (based on x number of records > processed, or y window of time passed). Which brings me to the next use > case. > > - I have some logic for calculating hourly statistics. So I’m dealing with > Windowed data already. These stats then need to be written to an external > database for use by user facing systems. Obviously I need to write the > final result for each hourly window after we’re past that window of time > (or I can write as often as it gets updated but the problem is that the > external database is not as fast as Kafka). I do understand that I need to > take into account the fact that events may arrive out of order and there > may be some records arriving a little bit after I’ve considered the > previous window over and have moved to the next one. I’d like to have some > sort of an hourly trigger (not just pure x milliseconds trigger, but also > support for cron style timing) and then also have the option to update the > stats I’ve already written for a window a set amount of time after the > trigger got triggered so that I can deal with events which arrive after the > write for that window. And then there’s a cut-off point after which > updating the stats for a very old window is just not worth it. Something > like this DSL: > > kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update > every hour afterwards */ Hours.toMillis(1), /* discard changes older than > this */ Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey, > record) -> { /* write */ } ); > > The tricky part is reconciling event source time and event processing > time. Clearly this trigger is in the event processing time whereas the data > is in the event source time most probably. > > Something like that :) > > Ara. > > On Sep 26, 2016, at 1:59 AM, Michael Noll <mich...@confluent.io<mailto:m > ich...@confluent.io><mailto:m > ich...@confluent.io<mailto:ich...@confluent.io>>> wrote: > > Ara, > > may I ask why you need to use micro-batching in the first place? > > Reason why I am asking: Typically, when people talk about micro-batching, > they are refer to the way some originally batch-based stream processing > tools "bolt on" real-time processing by making their batch sizes really > small. Here, micro-batching belongs to the realm of the inner workings of > the stream processing tool. > > Orthogonally to that, you have features/operations such as windowing, > triggers, etc. that -- unlike micro-batching -- allow you as the user of > the stream processing tool to define which exact computation logic you > need. Whether or not, say, windowing is or is not computed via > micro-batching behind the scenes should (at least in an ideal world) be of > no concern to the user. > > -Michael > > > > > > On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com< > mailto:ara.ebrah...@argyledata.com>< > mailto:ara.ebrah...@argyledata.com>> > wrote: > > Hi, > > What’s the best way to do micro-batching in Kafka Streams? Any plans for a > built-in mechanism? Perhaps StateStore could act as the buffer? What > exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem > to be used anywhere? > > http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/ > > Ara. > > > > ________________________________ > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Thank you in > advance for your cooperation. > > ________________________________ > > > > > ________________________________ > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Thank you in > advance for your cooperation. > > ________________________________ > > > > > > ________________________________ > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Thank you in > advance for your cooperation. > > ________________________________ > > > > > -- > -- Guozhang > > > > ________________________________ > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Thank you in > advance for your cooperation. > > ________________________________ > > > > > ________________________________ > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Thank you in > advance for your cooperation. > > ________________________________ > -- -- Guozhang