Hi Patrik,

Just to drop one observation in... Streaming to a topic and then consuming
it as a table does create overhead, but so does reducing a stream to a
table, and I think it's actually the same in either case.

They both require a store to collect the table state, and in both cases,
the stores need to have a changelog topic. For the "reduce" version, it's
an internal changelog topic, and for the "topic-to-table" version, the
store can use the intermediate topic as its changelog.

This doesn't address your ergonomic concern, but it seemed worth pointing
out that (as far as I can tell), there doesn't seem to be a difference in
overhead.

Hope this helps!
-John

On Fri, Oct 26, 2018 at 3:27 AM Patrik Kleindl <pklei...@gmail.com> wrote:

> Hello Matthias,
> thank you for the explanation.
> Streaming back to a topic and consuming this as a KTable does respect the
> null values as deletes, correct? But at the price of some overhead.
> Is there any (historical, technical or emotional;-)) reason that no simple
> one-step stream-to-table operation exists?
> Best regards
> Patrik
>
> > Am 26.10.2018 um 00:07 schrieb Matthias J. Sax <matth...@confluent.io>:
> >
> > Patrik,
> >
> > `null` values in a KStream don't have delete semantics (it's not a
> > changelog stream). That's why we drop them in the KStream#reduce
> > implemenation.
> >
> > If you want to explicitly remove results for a key from the result
> > KTable, your `Reducer#apply()` implementation must return `null` -- the
> > result of #apply() has changelog/KTable semantics and `null` is
> > interpreted as delete for this case.
> >
> > If you want to use `null` from your KStream to trigger reduce() to
> > delete, you will need to use a surrogate value for this, ie, do a
> > mapValues() before the groupByKey() call, an replace `null` values with
> > the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
> > to return `null` for this case.
> >
> > Hope this helps.
> >
> > -Matthias
> >
> >> On 10/25/18 10:36 AM, Patrik Kleindl wrote:
> >> Hello
> >>
> >> Recently we noticed a lot of warning messages in the logs which pointed
> to
> >> this method (we are running 2.0):
> >>
> >> KStreamReduce
> >> public void process(final K key, final V value) {
> >>            // If the key or value is null we don't need to proceed
> >>            if (key == null || value == null) {
> >>                LOG.warn(
> >>                    "Skipping record due to null key or value. key=[{}]
> >> value=[{}] topic=[{}] partition=[{}] offset=[{}]",
> >>                    key, value, context().topic(), context().partition(),
> >> context().offset()
> >>                );
> >>                metrics.skippedRecordsSensor().record();
> >>                return;
> >>            }
> >>
> >> This was triggered for every record from a stream with an existing key
> but
> >> a null value which we put through groupBy/reduce to get a KTable.
> >> My assumption was that this was the correct way inside a streams
> >> application to get a KTable but this prevents deletion of records from
> >> working.
> >>
> >> Our alternativ is to send the stream back to a named topic and build a
> new
> >> table from it, but this is rather cumbersome and requires a separate
> topic
> >> which also can't be cleaned up by the streams reset tool.
> >>
> >> Did I miss anything relevant here?
> >> Would it be possible to create a separate method for KStream to achieve
> >> this directly?
> >>
> >> best regards
> >>
> >> Patrik
> >>
> >
>

Reply via email to