Hi again Patrik, Actually, this is a good question... Can you share some context about why you need to convert a stream to a table (including nulls as retractions)?
Thanks, -John On Fri, Oct 26, 2018 at 5:36 PM Matthias J. Sax <matth...@confluent.io> wrote: > I don't know your overall application setup. However, a KStream > semantically models immutable facts and there is not update semantic. > Thus, it seems semantically questionable, to allow changing the > semantics from facts to updates (the other way is easier IMHO, and thus > supported via KTable#toStream()). > > Does this make sense? > > Having said this: you _can_ write a KStream into a topic an read it back > as KTable. But it's semantically questionable to do so, IMHO. Maybe it > makes sense for your specific application, but in general I don't think > it does make sense. > > > -Matthias > > On 10/26/18 9:30 AM, John Roesler wrote: > > Hi Patrik, > > > > Just to drop one observation in... Streaming to a topic and then > consuming > > it as a table does create overhead, but so does reducing a stream to a > > table, and I think it's actually the same in either case. > > > > They both require a store to collect the table state, and in both cases, > > the stores need to have a changelog topic. For the "reduce" version, it's > > an internal changelog topic, and for the "topic-to-table" version, the > > store can use the intermediate topic as its changelog. > > > > This doesn't address your ergonomic concern, but it seemed worth pointing > > out that (as far as I can tell), there doesn't seem to be a difference in > > overhead. > > > > Hope this helps! > > -John > > > > On Fri, Oct 26, 2018 at 3:27 AM Patrik Kleindl <pklei...@gmail.com> > wrote: > > > >> Hello Matthias, > >> thank you for the explanation. > >> Streaming back to a topic and consuming this as a KTable does respect > the > >> null values as deletes, correct? But at the price of some overhead. > >> Is there any (historical, technical or emotional;-)) reason that no > simple > >> one-step stream-to-table operation exists? > >> Best regards > >> Patrik > >> > >>> Am 26.10.2018 um 00:07 schrieb Matthias J. Sax <matth...@confluent.io > >: > >>> > >>> Patrik, > >>> > >>> `null` values in a KStream don't have delete semantics (it's not a > >>> changelog stream). That's why we drop them in the KStream#reduce > >>> implemenation. > >>> > >>> If you want to explicitly remove results for a key from the result > >>> KTable, your `Reducer#apply()` implementation must return `null` -- the > >>> result of #apply() has changelog/KTable semantics and `null` is > >>> interpreted as delete for this case. > >>> > >>> If you want to use `null` from your KStream to trigger reduce() to > >>> delete, you will need to use a surrogate value for this, ie, do a > >>> mapValues() before the groupByKey() call, an replace `null` values with > >>> the surrogate-delete-marker that you can evaluate in `Reducer#apply()` > >>> to return `null` for this case. > >>> > >>> Hope this helps. > >>> > >>> -Matthias > >>> > >>>> On 10/25/18 10:36 AM, Patrik Kleindl wrote: > >>>> Hello > >>>> > >>>> Recently we noticed a lot of warning messages in the logs which > pointed > >> to > >>>> this method (we are running 2.0): > >>>> > >>>> KStreamReduce > >>>> public void process(final K key, final V value) { > >>>> // If the key or value is null we don't need to proceed > >>>> if (key == null || value == null) { > >>>> LOG.warn( > >>>> "Skipping record due to null key or value. key=[{}] > >>>> value=[{}] topic=[{}] partition=[{}] offset=[{}]", > >>>> key, value, context().topic(), > context().partition(), > >>>> context().offset() > >>>> ); > >>>> metrics.skippedRecordsSensor().record(); > >>>> return; > >>>> } > >>>> > >>>> This was triggered for every record from a stream with an existing key > >> but > >>>> a null value which we put through groupBy/reduce to get a KTable. > >>>> My assumption was that this was the correct way inside a streams > >>>> application to get a KTable but this prevents deletion of records from > >>>> working. > >>>> > >>>> Our alternativ is to send the stream back to a named topic and build a > >> new > >>>> table from it, but this is rather cumbersome and requires a separate > >> topic > >>>> which also can't be cleaned up by the streams reset tool. > >>>> > >>>> Did I miss anything relevant here? > >>>> Would it be possible to create a separate method for KStream to > achieve > >>>> this directly? > >>>> > >>>> best regards > >>>> > >>>> Patrik > >>>> > >>> > >> > > > >