mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r652355524
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -82,20 +87,23 @@
private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1>
{
private WindowStore<K, V2> otherWindowStore;
- private StreamsMetricsImpl metrics;
private Sensor droppedRecordsSensor;
private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>>
outerJoinWindowStore = Optional.empty();
- @SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
- metrics = (StreamsMetricsImpl) context.metrics();
+ final StreamsMetricsImpl metrics = (StreamsMetricsImpl)
context.metrics();
droppedRecordsSensor =
droppedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
otherWindowStore = context.getStateStore(otherWindowName);
- if
(StreamsConfig.InternalConfig.getBoolean(context().appConfigs(),
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
- outerJoinWindowStore = outerJoinWindowName.map(name ->
context.getStateStore(name));
+ if (enableSpuriousResultFix
Review comment:
After update the code with the `else` a few tests started to fail. The
issue is, that for left/outer join we _always_ set the store name (even if the
feature is disabled...) -- only for inner join, we get an `Optinal.empty()`.
Thus, we can actually not verify the `else` case (ie, we added a store even if
we don't need it) at runtime. I guess we need to rely on the added unit tests
instead to cover this case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]