Thanks Matthias. My doubt is on a more fundamental level, but I'll ask on a separate thread as per Eno's recommendation.
On Tue, May 16, 2017 at 3:26 PM Matthias J. Sax <matth...@confluent.io> wrote: > Just to add to this: > > Streams create re-partitioning topics "lazy", meaning when the key is > (potentially) changes, we only set a flag but don't add the > re-partitioning topic. On groupByKey() or join() we check is this > "re-partitioning required flag" is set in add the topic. > > Thus, if you have a > > stream.map().map().selectKey().groupBy() > > we only re-partition on groupBy, but not after the first two map() steps. > > > -Matthias > > On 5/16/17 8:18 AM, Eno Thereska wrote: > > (it's preferred to create another email thread for a different topic to > make it easier to look back) > > > > Yes, there could be room for optimizations, e.g., see this: > http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3cCAJikTEUHR=r0ika6vlf_y+qajxg8f_q19og_-s+q-gozpqb...@mail.gmail.com%3e > < > http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3CCAJikTEUHR=r0ika6vlf_y+qajxg8f_q19og_-s+q-gozpqb...@mail.gmail.com%3E > > > > > > Eno > > > >> On 16 May 2017, at 16:01, João Peixoto <joao.harti...@gmail.com> wrote: > >> > >> Follow up doubt (let me know if a new question should be created). > >> > >> Do we always need a repartitioning topic? > >> > >> If I'm reading things correctly when we change the key of a record we > need > >> make sure the new key falls on the same partition that we are > processing. > >> This makes a lot of sense if after such change we'd need to join on some > >> other stream/table or cases where we sink to a topic. > >> However, in cases where none of these things, the repartition topic does > >> nothing? If this is true can we somehow not create it? > >> > >> On Sun, May 14, 2017 at 7:58 PM João Peixoto <joao.harti...@gmail.com> > >> wrote: > >> > >>> Very useful links, thank you. > >>> > >>> Part of my original misunderstanding was that the at-least-once > guarantee > >>> was considered fulfilled if the record reached a sink node. > >>> > >>> Thanks for all the feedback, you may consider my question answered. > >>> Feel free to ask further questions about the use case if found > interesting. > >>> On Sun, May 14, 2017 at 4:31 PM Matthias J. Sax <matth...@confluent.io > > > >>> wrote: > >>> > >>>> Yes. > >>>> > >>>> It is basically "documented", as Streams guarantees at-least-once > >>>> semantics. Thus, we make sure, you will not loose any data in case of > >>>> failure. (ie, the overall guarantee is documented) > >>>> > >>>> To achieve this, we always flush before we commit offsets. (This is > not > >>>> explicitly documented as it's an implementation detail.) > >>>> > >>>> There is some doc's in the wiki: > >>>> > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Internal+Data+Management#KafkaStreamsInternalDataManagement-Commits > >>>> > >>>> This might also help in case you want to dig into the code: > >>>> > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 5/14/17 4:07 PM, João Peixoto wrote: > >>>>> I think I now understand what Matthias meant when he said "If you > use a > >>>>> global remote store, you would not need to back your changes in a > >>>> changelog > >>>>> topic, as the store would not be lost if case of failure". > >>>>> > >>>>> I had the misconception that if a state store threw an exception > during > >>>>> "flush", all messages received between now and the previous flush > would > >>>> be > >>>>> "lost", hence the need for a changelog topic. However, it seems that > the > >>>>> "repartition" topic actually solves this problem. > >>>>> > >>>>> There's very little information about the latter, at least that I > could > >>>>> find, but an entry seems to be added whenever a record enters the > >>>>> "aggregate", but the state store "consumer" of this topic only > updates > >>>> its > >>>>> offset after the flush completes, meaning that the repartition topic > >>>> will > >>>>> be replayed! It seems this problem is already solved for me, I'd > >>>> appreciate > >>>>> if someone could point me to the documentation or code that backs up > the > >>>>> above. > >>>>> > >>>>> > >>>>> On Sat, May 13, 2017 at 3:11 PM João Peixoto < > joao.harti...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Replies in line as well > >>>>>> > >>>>>> > >>>>>> On Sat, May 13, 2017 at 3:25 AM Eno Thereska < > eno.there...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi João, > >>>>>>> > >>>>>>> Some answers inline: > >>>>>>> > >>>>>>>> On 12 May 2017, at 18:27, João Peixoto <joao.harti...@gmail.com> > >>>> wrote: > >>>>>>>> > >>>>>>>> Thanks for the comments, here are some clarifications: > >>>>>>>> > >>>>>>>> I did look at interactive queries, if I understood them correctly > it > >>>>>>> means > >>>>>>>> that my state store must hold all the results in order for it to > be > >>>>>>>> queried, either in memory or through disk (RocksDB). > >>>>>>> > >>>>>>> Yes, that's correct. > >>>>>>> > >>>>>>> > >>>>>>>> 1. The retention policy on my aggregate operations, in my case, > is 90 > >>>>>>> days, > >>>>>>>> which is way too much data to hold in memory > >>>>>>> > >>>>>>> It will depend on how much data/memory you have, but perhaps it > could > >>>> be > >>>>>>> too much to hold in memory > >>>>>>> for that long (especially because some failure is bound to happen > in > >>>> 90 > >>>>>>> days) > >>>>>>> > >>>>>>>> 2. My stream instances do no have access to disk, even if they > did, > >>>>>>>> wouldn't it mean I'd need almost twice the disk space to hold the > >>>> same > >>>>>>>> data? I.e. kafka brokers golding the topics + RocksDB holding the > >>>> state? > >>>>>>> > >>>>>>> That's interesting, is there a reason why the streams instances > don't > >>>>>>> have access to a local file system? I'm curious > >>>>>>> what kind of deployment you have. > >>>>>>> > >>>>>> > >>>>>> My instances are deployed on Kubernetes > >>>>>> > >>>>>> > >>>>>>> > >>>>>>> It is true that twice the disk space is needed to hold the data on > >>>>>>> RocksDb as well as in the Kafka changelog topic, > >>>>>>> however that is no different that the current situation where the > >>>> data is > >>>>>>> stored to a remote database right? > >>>>>>> I understand your point that you might not have access to local > disk > >>>>>>> though. > >>>>>>> > >>>>>> > >>>>>> Not quite. In my scenario the state store would be backed by the > >>>> changelog > >>>>>> AND MongoDB. On bootstrap we would fetch the state stored in the > >>>> changelog > >>>>>> into a structure like a LoadingCache (from Guava's), which would > load > >>>>>> additional fields from MongoDB if needed. Therefore the changelog > could > >>>>>> hold say 12 hours of records and 90 days is stored in MongoDB only. > >>>> That's > >>>>>> exactly the whole strategy I'm trying to validate. The implications > >>>> would > >>>>>> be that in case of failure we'd need to recover within 12 hours or > else > >>>>>> we'd need to replay from the source topics. > >>>>>> > >>>>>> > >>>>>>> > >>>>>>> > >>>>>>>> 3. Because a crash may happen between an entry is added to the > >>>> changelog > >>>>>>>> and the data store is flushed, I need to get all the changes > >>>> everytime > >>>>>>> if I > >>>>>>>> want to guarantee that all data is eventually persisted. This is > why > >>>>>>>> checkpoint files may not work for me. > >>>>>>> > >>>>>>> The upcoming exactly-once support in 0.11 will help with these > kind of > >>>>>>> guarantees. > >>>>>>> > >>>>>> > >>>>>> Not sure I understand how exactly-once would help in this case. > >>>>>> > >>>>>> > >>>>>>> > >>>>>>>> > >>>>>>>> Standby tasks looks great, I forgot about those. > >>>>>>>> > >>>>>>>> I'm at the design phase so this is all tentative. Answering > Matthias > >>>>>>>> questions > >>>>>>>> > >>>>>>>> My state stores are local. As mentioned above I do not have > access to > >>>>>>> disk > >>>>>>>> therefore I need to recover all data from somewhere, in this case > I'm > >>>>>>>> thinking about the changelog. > >>>>>>> > >>>>>>> So this is where I get confused a bit, since you mention that your > >>>> state > >>>>>>> stores are "local", i.e., the streams instance > >>>>>>> does have access to a local file system. > >>>>>>> > >>>>>> > >>>>>> When I said "local" I meant that the state stores are partial for > each > >>>>>> instance, i.e. they only have the partitions the task is responsible > >>>> for > >>>>>> (normal behavior) rather than a global store. > >>>>>> > >>>>>> > >>>>>>> > >>>>>>>> I read about Kafka Connect but have never used it, maybe that'll > >>>>>>> simplify > >>>>>>>> things, but I need to do some studying there. > >>>>>>>> > >>>>>>>> The reason why even though my stores are local but still I want to > >>>> store > >>>>>>>> them on a database and not use straight up RocksDB (or global > >>>> stores) is > >>>>>>>> because this would allow me to migrate my current processing > >>>> pipeline to > >>>>>>>> Kafka Streams without needing to change the frontend part of the > >>>>>>>> application, which fetches data from MongoDB. > >>>>>>> > >>>>>>> Makes sense. > >>>>>>> > >>>>>>>> > >>>>>>>> PS When you mention Global State Stores I'm thinking of > >>>>>>>> > >>>>>>> > >>>> > http://docs.confluent.io/3.2.0/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application > >>>>>>> , > >>>>>>>> is this correct? > >>>>>>> > >>>>>>> No I think Matthias is saying that if you have a remote server > >>>> somewhere > >>>>>>> where you store all your data (like a shared file system). > >>>>>>> This is not something Kafka would provide. > >>>>>>> > >>>>>>> Eno > >>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Fri, May 12, 2017 at 10:02 AM Matthias J. Sax < > >>>> matth...@confluent.io > >>>>>>>> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> I am not sure about your overall setup. Are your stores local > >>>> (similar > >>>>>>>>> to RocksDB) or are you using one global remote store? If you use > a > >>>>>>>>> global remote store, you would not need to back your changes in a > >>>>>>>>> changelog topic, as the store would not be lost if case of > failure. > >>>>>>>>> > >>>>>>>>> Also (in case that your stores are remote), did you consider > using > >>>>>>> Kafka > >>>>>>>>> Connect to export your data into an external store like MySQL or > >>>>>>> MongoDB > >>>>>>>>> instead of writing your own custom stores for Streams? > >>>>>>>>> > >>>>>>>>> If your stores are local, why do you write custom stores? I am > >>>> curious > >>>>>>>>> to understand why RocksDB does not serve your needs. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> About your two comment: > >>>>>>>>> > >>>>>>>>> (1) Streams uses RocksDB by default and the default > implementation > >>>> is > >>>>>>>>> using "checkpoint files" in next release. Those checkpoint files > >>>> track > >>>>>>>>> the changelog offsets of the data that got flushed to disc. This > >>>> allows > >>>>>>>>> to reduce the startup time, as only the tail of the changelog > needs > >>>> to > >>>>>>>>> be read to bring the store up to date. For this, you would always > >>>> (1) > >>>>>>>>> write to the changelog, (2) write to you store. Each time you > need > >>>> to > >>>>>>>>> flush, you know that all data is in the changelog already. After > >>>> each > >>>>>>>>> flush, you can update the "local offset checkpoint file". > >>>>>>>>> > >>>>>>>>> I guess, if you use local stores you can apply a similar pattern > in > >>>> you > >>>>>>>>> custom store implementation. (And as mentioned above, for global > >>>> remote > >>>>>>>>> store you would not need the changelog anyway. -- This also > applies > >>>> to > >>>>>>>>> your recovery question from below.) > >>>>>>>>> > >>>>>>>>> (2) You can configure standby task (via StreamConfig > >>>>>>>>> "num.standby.replicas"). This will set up standby tasks that > >>>> passively > >>>>>>>>> replicate your stores to another instance. In error case, state > >>>> will be > >>>>>>>>> migrated to those "hot standbys" reducing recovery time > >>>> significantly. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> About your question: > >>>>>>>>> > >>>>>>>>> (1) Yes. > >>>>>>>>> (2) Partly parallel (ie, if you run with multiple threads -- cf. > >>>>>>>>> StreamsConfig "num.streams.thread"). Each thread, flushes all > it's > >>>>>>>>> stores sequentially. > >>>>>>>>> (3) Yes. There will be a store for each partition. (If store is > >>>> local.) > >>>>>>>>> (4) Yes. The overall processing loop is sequential (cf. > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture > >>>>>>>>> ) > >>>>>>>>> Also, the next commit point is computed after a successful > commit -- > >>>>>>>>> thus, if one commit is delayed, all consecutive commit points are > >>>>>>>>> "shifted" by this delay. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 5/12/17 9:00 AM, João Peixoto wrote: > >>>>>>>>>> On a stream definition I perform an "aggregate" which is > configured > >>>>>>> with > >>>>>>>>> a > >>>>>>>>>> state store. > >>>>>>>>>> > >>>>>>>>>> *Goal*: Persist the aggregation results into a database, e.g. > >>>> MySQL or > >>>>>>>>>> MongoDB > >>>>>>>>>> > >>>>>>>>>> *Working approach*: > >>>>>>>>>> I have created a custom StateStore backed by a changelog topic > like > >>>>>>> the > >>>>>>>>>> builtin state stores. Whenever the store gets flushed I save to > the > >>>>>>>>>> database, mark the record as being persisted and log the change > in > >>>> the > >>>>>>>>>> changelog. > >>>>>>>>>> > >>>>>>>>>> If something goes wrong during processing, the changelog > guarantees > >>>>>>> that > >>>>>>>>> I > >>>>>>>>>> do not lose data, restores the state and if some data point was > not > >>>>>>>>>> persisted, the next stream instance will persist it on its flush > >>>>>>>>> operation. > >>>>>>>>>> > >>>>>>>>>> 1. I cannot store too much data in the changelog, even with > >>>>>>> compaction, > >>>>>>>>> if > >>>>>>>>>> I have too much data, bootstrapping a stream instance would > take a > >>>>>>> long > >>>>>>>>> time > >>>>>>>>>> 2. On the other hand, if I take too long to recover from a > >>>> failure, I > >>>>>>> may > >>>>>>>>>> lose data. So there seems to be a delicate tradeoff here > >>>>>>>>>> > >>>>>>>>>> *Questions*: > >>>>>>>>>> > >>>>>>>>>> 1. Is this a reasonable use case? > >>>>>>>>>> 2. In a scenario where my stream would have a fanout (multiple > >>>>>>>>> sub-streams > >>>>>>>>>> based on the same stream), each branch would perform different > >>>>>>>>> "aggregate" > >>>>>>>>>> operations, each with its own state store. Are state stores > >>>> flushed in > >>>>>>>>>> parallel or sequentially? > >>>>>>>>>> 3. The above also applies per-partition. As a stream definition > is > >>>>>>>>>> parallelized by partition, will one instance hold different > store > >>>>>>>>> instances > >>>>>>>>>> for each one? > >>>>>>>>>> 4. Through synthetic sleeps I simulated slow flushes, slower > than > >>>> the > >>>>>>>>>> commit interval. The stream seems to be ok with it and didn't > >>>> throw, I > >>>>>>>>>> assume the Kafka consumer does not poll more records until all > of > >>>> the > >>>>>>>>>> previous poll's are committed, but I couldn't find > documentation to > >>>>>>> back > >>>>>>>>>> this statement. Is there a timeout for "commit" operations? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Sample code > >>>>>>>>>> > >>>>>>>>>> public class AggregateHolder { > >>>>>>>>>> > >>>>>>>>>> private Long commonKey; > >>>>>>>>>> private List<Double> rawValues = new ArrayList<>(); > >>>>>>>>>> private boolean persisted; > >>>>>>>>>> > >>>>>>>>>> // ... > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> And stream definition > >>>>>>>>>> > >>>>>>>>>> source.groupByKey(Serdes.String(), recordSerdes) > >>>>>>>>>> .aggregate( > >>>>>>>>>> AggregateHolder::new, > >>>>>>>>>> (aggKey, value, aggregate) -> > >>>>>>>>>> aggregate.addValue(value.getValue()), > >>>>>>>>>> new DemoStoreSupplier<>(/* ... */) > >>>>>>>>>> ) > >>>>>>>>>> .foreach((key, agg) -> log.debug("Aggregate: {}={} > >>>> ({})", > >>>>>>>>>> key, agg.getAverage(), agg)); > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>>> > >>>>> > >>>> > >>>> > > > > > >