> - Also, in case you want to do some shenanigans (like for some tooling
you're building
> around state stores/changelogs/interactive queries) such detecting all
state store changelogs
> by doing the equivalent of `ls *-changelog`, then this will miss
changelogs of KTables that are
> created by `through()` and `to()` [...]

Addendum: And that's because the topic that is created by
`KTable#through()` and `KTable#to()` is, by definition, a changelog of that
KTable and the associated state store.



On Wed, Nov 23, 2016 at 4:15 PM, Michael Noll <mich...@confluent.io> wrote:

> Mikael,
>
> regarding your second question:
>
> > 2) Regarding the use case, the topology looks like this:
> >
> > .stream(...)
> > .aggregate(..., "store-1")
> > .mapValues(...)
> > .through(..., "store-2")
>
> The last operator above would, without "..." ellipsis, be sth like
> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is
> the changelog topic for both the KTable and the state store "store-2".  So
> this is the changelog topic name that you want to know.
>
> - If you want the "through" topic to have a `-changelog` suffix, then
> you'd need to add that yourself in the call to `through(...)`.
>
> - If you wonder why `through()` doesn't add a `-changelog` suffix
> automatically:  That's because `through()` -- like `to()` or `stream()`,
> `table()` -- require you to explicitly provide a topic name, and of course
> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
> added when Kafka creates internal changelog topics behind the scenes for
> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
> because it refers to `-changelog`;  we'll fix that as mentioned above.
>
> - Also, in case you want to do some shenanigans (like for some tooling
> you're building around state stores/changelogs/interactive queries) such
> detecting all state store changelogs by doing the equivalent of `ls
> *-changelog`, then this will miss changelogs of KTables that are created by
> `through()` and `to()` (unless you come up with a naming convention that
> your tooling can assume to be in place, e.g. by always adding `-changelog`
> to topic names when you call `through()`).
>
> I hope this helps!
> Michael
>
>
>
>
> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <hoegqv...@gmail.com>
> wrote:
>
>> Hi Eno,
>>
>> 1) Great :)
>>
>> 2) Yes, we are using the Interactive Queries to access the state stores.
>> In
>> addition, we access the changelogs to subscribe to updates. For this
>> reason
>> we need to know the changelog topic name.
>>
>> Thanks,
>> Mikael
>>
>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <eno.there...@gmail.com>
>> wrote:
>>
>> > HI Mikael,
>> >
>> > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
>> looking
>> > into fixing it. I agree that it can be confusing to have topic names
>> that
>> > are not what one would expect.
>> >
>> > 2) If your goal is to query/read from the state stores, you can use
>> > Interactive Queries to do that (you don't need to worry about the
>> changelog
>> > topic name and such). Interactive Queries is a new feature in 0.10.1
>> (blog
>> > here:
>> > https://www.confluent.io/blog/unifying-stream-processing-and
>> -interactive-queries-in-apache-kafka/
>> > <
>> > https://www.confluent.io/blog/unifying-stream-processing-and
>> -interactive-queries-in-apache-kafka/
>> > >).
>> >
>> > Thanks
>> > Eno
>> >
>> >
>> > > On 22 Nov 2016, at 19:27, Mikael Högqvist <hoegqv...@gmail.com>
>> wrote:
>> > >
>> > > Sorry for being unclear, i'll try again :)
>> > >
>> > > 1) The JavaDoc for through is not correct, it states that a changelog
>> > topic
>> > > will be created for the state store. That is, if I would call it with
>> > > through("topic", "a-store"), I would expect a kafka topic
>> > > "my-app-id-a-store-changelog" to be created.
>> > >
>> > > 2) Regarding the use case, the topology looks like this:
>> > >
>> > > .stream(...)
>> > > .aggregate(..., "store-1")
>> > > .mapValues(...)
>> > > .through(..., "store-2")
>> > >
>> > > Basically, I want to materialize both the result from the aggregate
>> > method
>> > > and the result from mapValues, which is materialized using .through().
>> > > Later, I will access both the tables (store-1 and store-2) to a) get
>> the
>> > > current state of the aggregate, b) subscribe to future updates. This
>> > works
>> > > just fine. The only issue is that I assumed to have a changelog topic
>> for
>> > > store-2 created automatically, which didnt happen.
>> > >
>> > > Since I want to access the changelog topic, it helps if the naming is
>> > > consistent. So either we enforce the same naming pattern as kafka when
>> > > calling .through() or alternatively the Kafka Streams API can provide
>> a
>> > > method to materialize tables which creates a topic name according to
>> the
>> > > naming pattern. E.g. .through() without the topic parameter.
>> > >
>> > > What do you think?
>> > >
>> > > Best,
>> > > Mikael
>> > >
>> > > On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <
>> matth...@confluent.io>
>> > > wrote:
>> > >
>> > >> I cannot completely follow what want to achieve.
>> > >>
>> > >> However, the JavaDoc for through() seems not to be correct to me.
>> Using
>> > >> through() will not create an extra internal changelog topic with the
>> > >> described naming schema, because the topic specified in through()
>> can be
>> > >> used for this (there is no point in duplicating the data).
>> > >>
>> > >> If you have a KTable and apply a mapValues(), this will not write
>> data
>> > >> to any topic. The derived KTable is in-memory because you can easily
>> > >> recreate it from its base KTable.
>> > >>
>> > >> What is the missing part you want to get?
>> > >>
>> > >> Btw: the internally created changelog topics are only used for
>> recovery
>> > >> in case of failure. Streams does not consumer from those topic during
>> > >> "normal operation".
>> > >>
>> > >>
>> > >> -Matthias
>> > >>
>> > >>
>> > >>
>> > >> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
>> > >>> Hi,
>> > >>>
>> > >>> in the documentation for KTable#through, it is stated that a new
>> > >> changelog
>> > >>> topic will be created for the table. It also states that calling
>> > through
>> > >> is
>> > >>> equivalent to calling #to followed by KStreamBuilder#table.
>> > >>>
>> > >>>
>> > >>
>> > http://kafka.apache.org/0101/javadoc/org/apache/kafka/stream
>> s/kstream/KTable.html#through(org.apache.kafka.common.
>> serialization.Serde,%20org.apache.kafka.common.serializat
>> ion.Serde,%20java.lang.String,%20java.lang.String)
>> > >>>
>> > >>> In the docs for KStreamBuilder#table it is stated that no new
>> changelog
>> > >>> topic will be created since the underlying topic acts as the
>> changelog.
>> > >>> I've verified that this is the case.
>> > >>>
>> > >>> Is there another API method to materialize the results of a KTable
>> > >>> including a changelog, i.e. such that kafka streams creates the
>> topic
>> > and
>> > >>> uses the naming schema for changelog topics? The use case I have in
>> > mind
>> > >> is
>> > >>> aggregate followed by mapValues.
>> > >>>
>> > >>> Best,
>> > >>> Mikael
>> > >>>
>> > >>
>> > >>
>> >
>> >
>>
>
>
>

Reply via email to