Hi again Patrik,

Actually, this is a good question... Can you share some context about why
you need to convert a stream to a table (including nulls as retractions)?

Thanks,
-John

On Fri, Oct 26, 2018 at 5:36 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> I don't know your overall application setup. However, a KStream
> semantically models immutable facts and there is not update semantic.
> Thus, it seems semantically questionable, to allow changing the
> semantics from facts to updates (the other way is easier IMHO, and thus
> supported via KTable#toStream()).
>
> Does this make sense?
>
> Having said this: you _can_ write a KStream into a topic an read it back
> as KTable. But it's semantically questionable to do so, IMHO. Maybe it
> makes sense for your specific application, but in general I don't think
> it does make sense.
>
>
> -Matthias
>
> On 10/26/18 9:30 AM, John Roesler wrote:
> > Hi Patrik,
> >
> > Just to drop one observation in... Streaming to a topic and then
> consuming
> > it as a table does create overhead, but so does reducing a stream to a
> > table, and I think it's actually the same in either case.
> >
> > They both require a store to collect the table state, and in both cases,
> > the stores need to have a changelog topic. For the "reduce" version, it's
> > an internal changelog topic, and for the "topic-to-table" version, the
> > store can use the intermediate topic as its changelog.
> >
> > This doesn't address your ergonomic concern, but it seemed worth pointing
> > out that (as far as I can tell), there doesn't seem to be a difference in
> > overhead.
> >
> > Hope this helps!
> > -John
> >
> > On Fri, Oct 26, 2018 at 3:27 AM Patrik Kleindl <pklei...@gmail.com>
> wrote:
> >
> >> Hello Matthias,
> >> thank you for the explanation.
> >> Streaming back to a topic and consuming this as a KTable does respect
> the
> >> null values as deletes, correct? But at the price of some overhead.
> >> Is there any (historical, technical or emotional;-)) reason that no
> simple
> >> one-step stream-to-table operation exists?
> >> Best regards
> >> Patrik
> >>
> >>> Am 26.10.2018 um 00:07 schrieb Matthias J. Sax <matth...@confluent.io
> >:
> >>>
> >>> Patrik,
> >>>
> >>> `null` values in a KStream don't have delete semantics (it's not a
> >>> changelog stream). That's why we drop them in the KStream#reduce
> >>> implemenation.
> >>>
> >>> If you want to explicitly remove results for a key from the result
> >>> KTable, your `Reducer#apply()` implementation must return `null` -- the
> >>> result of #apply() has changelog/KTable semantics and `null` is
> >>> interpreted as delete for this case.
> >>>
> >>> If you want to use `null` from your KStream to trigger reduce() to
> >>> delete, you will need to use a surrogate value for this, ie, do a
> >>> mapValues() before the groupByKey() call, an replace `null` values with
> >>> the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
> >>> to return `null` for this case.
> >>>
> >>> Hope this helps.
> >>>
> >>> -Matthias
> >>>
> >>>> On 10/25/18 10:36 AM, Patrik Kleindl wrote:
> >>>> Hello
> >>>>
> >>>> Recently we noticed a lot of warning messages in the logs which
> pointed
> >> to
> >>>> this method (we are running 2.0):
> >>>>
> >>>> KStreamReduce
> >>>> public void process(final K key, final V value) {
> >>>>            // If the key or value is null we don't need to proceed
> >>>>            if (key == null || value == null) {
> >>>>                LOG.warn(
> >>>>                    "Skipping record due to null key or value. key=[{}]
> >>>> value=[{}] topic=[{}] partition=[{}] offset=[{}]",
> >>>>                    key, value, context().topic(),
> context().partition(),
> >>>> context().offset()
> >>>>                );
> >>>>                metrics.skippedRecordsSensor().record();
> >>>>                return;
> >>>>            }
> >>>>
> >>>> This was triggered for every record from a stream with an existing key
> >> but
> >>>> a null value which we put through groupBy/reduce to get a KTable.
> >>>> My assumption was that this was the correct way inside a streams
> >>>> application to get a KTable but this prevents deletion of records from
> >>>> working.
> >>>>
> >>>> Our alternativ is to send the stream back to a named topic and build a
> >> new
> >>>> table from it, but this is rather cumbersome and requires a separate
> >> topic
> >>>> which also can't be cleaned up by the streams reset tool.
> >>>>
> >>>> Did I miss anything relevant here?
> >>>> Would it be possible to create a separate method for KStream to
> achieve
> >>>> this directly?
> >>>>
> >>>> best regards
> >>>>
> >>>> Patrik
> >>>>
> >>>
> >>
> >
>
>

Reply via email to