guozhangwang commented on code in PR #18111:
URL: https://github.com/apache/kafka/pull/18111#discussion_r1877177357
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java:
##########
@@ -198,16 +158,6 @@ public StreamStreamJoinNodeBuilder<K, V1, V2, VR>
withOuterJoinWindowStoreBuilde
return this;
}
- public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoined(final
Joined<K, V1, V2> joined) {
Review Comment:
Just to clarify, this func was not used even before this change at all ?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -77,10 +85,21 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut,
VThis, VOther> impleme
this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
this.joiner = joiner;
this.outer = outer;
- this.outerJoinWindowName = outerJoinWindowName;
+ this.outerJoinWindowStoreName =
outerJoinWindowStoreFactory.map(StoreFactory::storeName);
+ this.outerJoinWindowStoreFactory = outerJoinWindowStoreFactory;
this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ final Set<StoreBuilder<?>> stores = new HashSet<>();
+ stores.add(new FactoryWrappingStoreBuilder<>(otherWindowStoreFactory));
+ if (outerJoinWindowStoreFactory.isPresent() &&
enableSpuriousResultFix) {
Review Comment:
Just clarifying: so far, as long as `spuriousResultFixEnabled` is true, the
outer join should always present since it is just for this fix, right? Maybe
worth adding a comment for future extensions if so.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -55,20 +61,22 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut,
VThis, VOther> impleme
private final long windowsAfterMs;
private final boolean outer;
- private final Optional<String> outerJoinWindowName;
+ private final Optional<String> outerJoinWindowStoreName;
Review Comment:
nit: Do we really this additional Optional field? Seems we only used it in
https://github.com/apache/kafka/pull/18111/files#diff-6ca18143cc0226e6d1e4d5180ff81596a72d53639ca5184bf1238350265382a6R121
which can be leveraged upon `outerJoinWindowStoreFactory` as well?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java:
##########
@@ -43,13 +38,9 @@ private StreamStreamJoinNode(final String nodeName,
final ProcessorParameters<K, V1, ?, ?>
joinThisProcessorParameters,
final ProcessorParameters<K, V2, ?, ?>
joinOtherProcessParameters,
final ProcessorParameters<K, VR, ?, ?>
joinMergeProcessorParameters,
- final ProcessorParameters<K, V1, ?, ?>
thisWindowedStreamProcessorParameters,
- final ProcessorParameters<K, V2, ?, ?>
otherWindowedStreamProcessorParameters,
- final StoreFactory thisStoreFactory,
- final StoreFactory otherStoreFactory,
+ final String thisWindowedStreamProcessorName,
+ final String otherWindowedStreamProcessorName,
final Optional<StoreFactory>
outerJoinStoreFactory,
- final Joined<K, V1, V2> joined,
- final boolean enableSpuriousResultFix,
final ProcessorParameters<K, V1, ?, ?>
selfJoinProcessorParameters) {
Review Comment:
nit: move this up right after the joinMergeProcessorParameters?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##########
@@ -209,19 +209,13 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final
KStream<K, V1> lhs,
joinBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParams)
.withJoinThisProcessorParameters(joinThisProcessorParams)
.withJoinOtherProcessorParameters(joinOtherProcessorParams)
- .withThisWindowStoreBuilder(thisWindowStore)
- .withOtherWindowStoreBuilder(otherWindowStore)
-
.withThisWindowedStreamProcessorParameters(thisWindowStreamProcessorParams)
-
.withOtherWindowedStreamProcessorParameters(otherWindowStreamProcessorParams)
+
.withThisWindowedStreamProcessorName(thisWindowStreamProcessorParams.processorName())
+
.withOtherWindowedStreamProcessorName(otherWindowStreamProcessorParams.processorName())
.withOuterJoinWindowStoreBuilder(outerJoinWindowStore)
Review Comment:
Can we move this store builder to processors like other store builders?
##########
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java:
##########
@@ -1619,8 +1619,8 @@ public void
shouldWrapProcessorsForCoGroupedStreamAggregate() {
.toStream(Named.as("toStream"))// wrapped 4
.to("output", Produced.as("sink"));
- final var top = builder.build();
- System.out.println(top.describe());
Review Comment:
Ah I guess some debugging leftovers were overlooked -- thanks for cleaning
up!
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -38,15 +41,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashSet;
import java.util.Optional;
+import java.util.Set;
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, VThis, VOther>
implements ProcessorSupplier<K, VThis, K, VOut> {
private static final Logger LOG =
LoggerFactory.getLogger(KStreamKStreamJoin.class);
- private final String otherWindowName;
+ private final String otherWindowStoreName;
+ private final StoreFactory otherWindowStoreFactory;
Review Comment:
nit: ditto for `otherWindowStoreName`, seems we can always get it from
`otherWindowStoreFactory` below?
##########
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java:
##########
@@ -1816,6 +1816,105 @@ public void
shouldWrapProcessorsForStreamTableJoinWithGracePeriod() {
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
}
+ @Test
+ public void
shouldWrapProcessorsForStreamStreamJoinWithSpuriousResultsFix() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream1 = builder.stream("input-1",
Consumed.as("source-1"));
+ final KStream<String, String> stream2 = builder.stream("input-2",
Consumed.as("source-2"));
+
+ stream1.join(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceAndGrace(Duration.ofDays(1),
Duration.ofDays(1)),
+ StreamJoined.as("ss-join"))
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+
+ // TODO: fix these names once we address
https://issues.apache.org/jira/browse/KAFKA-18191
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "KSTREAM-JOINTHIS-0000000004", "KSTREAM-JOINOTHER-0000000005",
+ "KSTREAM-WINDOWED-0000000003", "KSTREAM-WINDOWED-0000000002",
+ "KSTREAM-MERGE-0000000006"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(4));
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void
shouldWrapProcessorsForStreamStreamJoinWithoutSpuriousResultsFix() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream1 = builder.stream("input-1",
Consumed.as("source-1"));
+ final KStream<String, String> stream2 = builder.stream("input-2",
Consumed.as("source-2"));
+
+ stream1.join(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.of(Duration.ofDays(1)), // intentionally uses
deprecated version of this API!
+ StreamJoined.as("ss-join"))
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+
+ // TODO: fix these names once we address
https://issues.apache.org/jira/browse/KAFKA-18191
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "KSTREAM-JOINTHIS-0000000004", "KSTREAM-JOINOTHER-0000000005",
+ "KSTREAM-WINDOWED-0000000003", "KSTREAM-WINDOWED-0000000002",
+ "KSTREAM-MERGE-0000000006"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(4));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForStreamStreamSelfJoin() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream1 = builder.stream("input",
Consumed.as("source"));
+
+ stream1.join(
+ stream1,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofDays(1)),
+ StreamJoined.as("ss-join"))
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+
+ // TODO: fix these names once we address
https://issues.apache.org/jira/browse/KAFKA-18191
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "KSTREAM-JOINTHIS-0000000003", "KSTREAM-JOINOTHER-0000000004",
+ "KSTREAM-WINDOWED-0000000001", "KSTREAM-WINDOWED-0000000002",
+ "KSTREAM-MERGE-0000000005"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
Review Comment:
+100. BTW could you file a JIRA for re-using the window store?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]