[ 
https://issues.apache.org/jira/browse/KAFKA-13721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-13721:
---------------------------------------

    Assignee: Matthias J. Sax

> Left-join still emit spurious results in stream-stream joins in some cases
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-13721
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13721
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.1.0
>            Reporter: Nollet
>            Assignee: Matthias J. Sax
>            Priority: Major
>
> Stream-stream joins seems to still emit spurious results for some window 
> configurations.
> From my tests, it happened when setting before to 0 and having a grace period 
> smaller than the window duration. More precisely it seems to happen when 
> setting before and 
> window duration > grace period + before
> h2. how to reproduce
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.TestInputTopic;
> import org.apache.kafka.streams.TestOutputTopic;
> import org.apache.kafka.streams.Topology;
> import org.apache.kafka.streams.TopologyTestDriver;
> import org.apache.kafka.streams.kstream.JoinWindows;
> import org.apache.kafka.streams.kstream.KStream;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> import java.time.Duration;
> import java.time.Instant;
> import java.util.Properties;
> public class SpuriousLeftJoinTest {
>     static final Duration WINDOW_DURATION = Duration.ofMinutes(10);
>     static final Duration GRACE = Duration.ofMinutes(6);
>     static final Duration BEFORE = Duration.ZERO;
>     static final String LEFT_TOPIC_NAME = "LEFT_TOPIC";
>     static final String RIGHT_TOPIC_NAME = "RIGHT_TOPIC";
>     static final String OUTPUT_TOPIC_NAME = "OUTPUT_TOPIC";
>     private static TopologyTestDriver testDriver;
>     private static TestInputTopic<String, Integer> inputTopicLeft;
>     private static TestInputTopic<String, Integer> inputTopicRight;
>     private static TestOutputTopic<String, Integer> outputTopic;
>     public static Topology createTopology() {
>         StreamsBuilder builder = new StreamsBuilder();
>         KStream<String, Integer> leftStream = builder.stream(LEFT_TOPIC_NAME);
>         KStream<String, Integer> rightStream = 
> builder.stream(RIGHT_TOPIC_NAME);
>         // return 1 if left join matched, otherwise 0
>         KStream<String, Integer> joined = leftStream.leftJoin(
>             rightStream,
>             (value1, value2) -> {
>                 if(value2 == null){
>                     return 0;
>                 }
>                 return 1;
>             },
>             JoinWindows.ofTimeDifferenceAndGrace(WINDOW_DURATION, GRACE)
>                 .before(BEFORE)
>         );
>         joined.to(OUTPUT_TOPIC_NAME);
>         return builder.build();
>     }
>     @Before
>     public void setup() {
>         Topology topology = createTopology();
>         Properties props = new Properties();
>         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
>         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.StringSerde.class);
>         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.IntegerSerde.class);
>         testDriver = new TopologyTestDriver(topology, props);
>         inputTopicLeft = testDriver.createInputTopic(LEFT_TOPIC_NAME, 
> Serdes.String().serializer(), Serdes.Integer().serializer());
>         inputTopicRight = testDriver.createInputTopic(RIGHT_TOPIC_NAME, 
> Serdes.String().serializer(), Serdes.Integer().serializer());
>         outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC_NAME, 
> Serdes.String().deserializer(), Serdes.Integer().deserializer());
>     }
>     @After
>     public void tearDown() {
>         testDriver.close();
>     }
>     @Test
>     public void shouldEmitOnlyOneMessageForKey1(){
>         Instant now = Instant.now();
>         inputTopicLeft.pipeInput("key1", 12, now);
>         inputTopicRight.pipeInput("key1", 13, now.plus(WINDOW_DURATION));
>         // send later record to increase stream time & close the window
>         inputTopicLeft.pipeInput("other_key", 1212122, 
> now.plus(WINDOW_DURATION).plus(GRACE).plusSeconds(10));
>         while (! outputTopic.isEmpty()){
>             System.out.println(outputTopic.readKeyValue());
>         }
>     }
> }
> {code}
> Stdout of previous code is
> {noformat}
> KeyValue(key1, 0)
> KeyValue(key1, 1)
> {noformat}
> However it should be
> {noformat}
> KeyValue(key1, 1)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to