Patrik, `null` values in a KStream don't have delete semantics (it's not a changelog stream). That's why we drop them in the KStream#reduce implemenation.
If you want to explicitly remove results for a key from the result KTable, your `Reducer#apply()` implementation must return `null` -- the result of #apply() has changelog/KTable semantics and `null` is interpreted as delete for this case. If you want to use `null` from your KStream to trigger reduce() to delete, you will need to use a surrogate value for this, ie, do a mapValues() before the groupByKey() call, an replace `null` values with the surrogate-delete-marker that you can evaluate in `Reducer#apply()` to return `null` for this case. Hope this helps. -Matthias On 10/25/18 10:36 AM, Patrik Kleindl wrote: > Hello > > Recently we noticed a lot of warning messages in the logs which pointed to > this method (we are running 2.0): > > KStreamReduce > public void process(final K key, final V value) { > // If the key or value is null we don't need to proceed > if (key == null || value == null) { > LOG.warn( > "Skipping record due to null key or value. key=[{}] > value=[{}] topic=[{}] partition=[{}] offset=[{}]", > key, value, context().topic(), context().partition(), > context().offset() > ); > metrics.skippedRecordsSensor().record(); > return; > } > > This was triggered for every record from a stream with an existing key but > a null value which we put through groupBy/reduce to get a KTable. > My assumption was that this was the correct way inside a streams > application to get a KTable but this prevents deletion of records from > working. > > Our alternativ is to send the stream back to a named topic and build a new > table from it, but this is rather cumbersome and requires a separate topic > which also can't be cleaned up by the streams reset tool. > > Did I miss anything relevant here? > Would it be possible to create a separate method for KStream to achieve > this directly? > > best regards > > Patrik >
signature.asc
Description: OpenPGP digital signature