> - Also, in case you want to do some shenanigans (like for some tooling you're building > around state stores/changelogs/interactive queries) such detecting all state store changelogs > by doing the equivalent of `ls *-changelog`, then this will miss changelogs of KTables that are > created by `through()` and `to()` [...]
Addendum: And that's because the topic that is created by `KTable#through()` and `KTable#to()` is, by definition, a changelog of that KTable and the associated state store. On Wed, Nov 23, 2016 at 4:15 PM, Michael Noll <mich...@confluent.io> wrote: > Mikael, > > regarding your second question: > > > 2) Regarding the use case, the topology looks like this: > > > > .stream(...) > > .aggregate(..., "store-1") > > .mapValues(...) > > .through(..., "store-2") > > The last operator above would, without "..." ellipsis, be sth like > `KTable#through("through-topic", "store-2")`. Here, "through-topic" is > the changelog topic for both the KTable and the state store "store-2". So > this is the changelog topic name that you want to know. > > - If you want the "through" topic to have a `-changelog` suffix, then > you'd need to add that yourself in the call to `through(...)`. > > - If you wonder why `through()` doesn't add a `-changelog` suffix > automatically: That's because `through()` -- like `to()` or `stream()`, > `table()` -- require you to explicitly provide a topic name, and of course > Kafka will use exactly this name. (FWIW, the `-changelog` suffix is only > added when Kafka creates internal changelog topics behind the scenes for > you.) Unfortunately, the javadocs of `KTable#through()` is incorrect > because it refers to `-changelog`; we'll fix that as mentioned above. > > - Also, in case you want to do some shenanigans (like for some tooling > you're building around state stores/changelogs/interactive queries) such > detecting all state store changelogs by doing the equivalent of `ls > *-changelog`, then this will miss changelogs of KTables that are created by > `through()` and `to()` (unless you come up with a naming convention that > your tooling can assume to be in place, e.g. by always adding `-changelog` > to topic names when you call `through()`). > > I hope this helps! > Michael > > > > > On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <hoegqv...@gmail.com> > wrote: > >> Hi Eno, >> >> 1) Great :) >> >> 2) Yes, we are using the Interactive Queries to access the state stores. >> In >> addition, we access the changelogs to subscribe to updates. For this >> reason >> we need to know the changelog topic name. >> >> Thanks, >> Mikael >> >> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <eno.there...@gmail.com> >> wrote: >> >> > HI Mikael, >> > >> > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is >> looking >> > into fixing it. I agree that it can be confusing to have topic names >> that >> > are not what one would expect. >> > >> > 2) If your goal is to query/read from the state stores, you can use >> > Interactive Queries to do that (you don't need to worry about the >> changelog >> > topic name and such). Interactive Queries is a new feature in 0.10.1 >> (blog >> > here: >> > https://www.confluent.io/blog/unifying-stream-processing-and >> -interactive-queries-in-apache-kafka/ >> > < >> > https://www.confluent.io/blog/unifying-stream-processing-and >> -interactive-queries-in-apache-kafka/ >> > >). >> > >> > Thanks >> > Eno >> > >> > >> > > On 22 Nov 2016, at 19:27, Mikael Högqvist <hoegqv...@gmail.com> >> wrote: >> > > >> > > Sorry for being unclear, i'll try again :) >> > > >> > > 1) The JavaDoc for through is not correct, it states that a changelog >> > topic >> > > will be created for the state store. That is, if I would call it with >> > > through("topic", "a-store"), I would expect a kafka topic >> > > "my-app-id-a-store-changelog" to be created. >> > > >> > > 2) Regarding the use case, the topology looks like this: >> > > >> > > .stream(...) >> > > .aggregate(..., "store-1") >> > > .mapValues(...) >> > > .through(..., "store-2") >> > > >> > > Basically, I want to materialize both the result from the aggregate >> > method >> > > and the result from mapValues, which is materialized using .through(). >> > > Later, I will access both the tables (store-1 and store-2) to a) get >> the >> > > current state of the aggregate, b) subscribe to future updates. This >> > works >> > > just fine. The only issue is that I assumed to have a changelog topic >> for >> > > store-2 created automatically, which didnt happen. >> > > >> > > Since I want to access the changelog topic, it helps if the naming is >> > > consistent. So either we enforce the same naming pattern as kafka when >> > > calling .through() or alternatively the Kafka Streams API can provide >> a >> > > method to materialize tables which creates a topic name according to >> the >> > > naming pattern. E.g. .through() without the topic parameter. >> > > >> > > What do you think? >> > > >> > > Best, >> > > Mikael >> > > >> > > On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax < >> matth...@confluent.io> >> > > wrote: >> > > >> > >> I cannot completely follow what want to achieve. >> > >> >> > >> However, the JavaDoc for through() seems not to be correct to me. >> Using >> > >> through() will not create an extra internal changelog topic with the >> > >> described naming schema, because the topic specified in through() >> can be >> > >> used for this (there is no point in duplicating the data). >> > >> >> > >> If you have a KTable and apply a mapValues(), this will not write >> data >> > >> to any topic. The derived KTable is in-memory because you can easily >> > >> recreate it from its base KTable. >> > >> >> > >> What is the missing part you want to get? >> > >> >> > >> Btw: the internally created changelog topics are only used for >> recovery >> > >> in case of failure. Streams does not consumer from those topic during >> > >> "normal operation". >> > >> >> > >> >> > >> -Matthias >> > >> >> > >> >> > >> >> > >> On 11/22/16 1:59 AM, Mikael Högqvist wrote: >> > >>> Hi, >> > >>> >> > >>> in the documentation for KTable#through, it is stated that a new >> > >> changelog >> > >>> topic will be created for the table. It also states that calling >> > through >> > >> is >> > >>> equivalent to calling #to followed by KStreamBuilder#table. >> > >>> >> > >>> >> > >> >> > http://kafka.apache.org/0101/javadoc/org/apache/kafka/stream >> s/kstream/KTable.html#through(org.apache.kafka.common. >> serialization.Serde,%20org.apache.kafka.common.serializat >> ion.Serde,%20java.lang.String,%20java.lang.String) >> > >>> >> > >>> In the docs for KStreamBuilder#table it is stated that no new >> changelog >> > >>> topic will be created since the underlying topic acts as the >> changelog. >> > >>> I've verified that this is the case. >> > >>> >> > >>> Is there another API method to materialize the results of a KTable >> > >>> including a changelog, i.e. such that kafka streams creates the >> topic >> > and >> > >>> uses the naming schema for changelog topics? The use case I have in >> > mind >> > >> is >> > >>> aggregate followed by mapValues. >> > >>> >> > >>> Best, >> > >>> Mikael >> > >>> >> > >> >> > >> >> > >> > >> > > >