ableegoldman commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r475791126
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ########## @@ -58,29 +60,46 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { - // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record - // If {@code keyMapper} returns {@code null} it implies there is no match, - // so ignore unless it is a left join + // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join + // and just ignore the record. // // we also ignore the record if value is null, because in a key-value data model a null-value indicates // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored - if (key == null || value == null) { + final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value); + if (!maybeMappedKey.isPresent()) { LOG.warn( "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", key, value, context().topic(), context().partition(), context().offset() ); droppedRecordsSensor.record(); } else { - final K2 mappedKey = keyMapper.apply(key, value); - final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey)); Review comment: Oh yeah, duh. Nevermind this 🙂 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org