Nollet created KAFKA-13721:
------------------------------

             Summary: Left-join still emit spurious results in stream-stream 
joins in some cases
                 Key: KAFKA-13721
                 URL: https://issues.apache.org/jira/browse/KAFKA-13721
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.1.0
            Reporter: Nollet


Stream-stream joins seems to still emit spurious results for some window 
configurations.

>From my tests, it happened when setting before to 0 and having a grace period 
>smaller than the window duration. More precisely it seems to happen when 
>setting before and 
window duration > grace period + before
h2. how to reproduce
{code:java}
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.Properties;

public class SpuriousLeftJoinTest {

    static final Duration WINDOW_DURATION = Duration.ofMinutes(10);
    static final Duration GRACE = Duration.ofMinutes(6);
    static final Duration BEFORE = Duration.ZERO;
    static final String LEFT_TOPIC_NAME = "LEFT_TOPIC";
    static final String RIGHT_TOPIC_NAME = "RIGHT_TOPIC";
    static final String OUTPUT_TOPIC_NAME = "OUTPUT_TOPIC";


    private static TopologyTestDriver testDriver;
    private static TestInputTopic<String, Integer> inputTopicLeft;
    private static TestInputTopic<String, Integer> inputTopicRight;
    private static TestOutputTopic<String, Integer> outputTopic;

    public static Topology createTopology() {

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, Integer> leftStream = builder.stream(LEFT_TOPIC_NAME);
        KStream<String, Integer> rightStream = builder.stream(RIGHT_TOPIC_NAME);

        // return 1 if left join matched, otherwise 0
        KStream<String, Integer> joined = leftStream.leftJoin(
            rightStream,
            (value1, value2) -> {
                if(value2 == null){
                    return 0;
                }
                return 1;
            },
            JoinWindows.ofTimeDifferenceAndGrace(WINDOW_DURATION, GRACE)
                .before(BEFORE)
        );

        joined.to(OUTPUT_TOPIC_NAME);

        return builder.build();
    }


    @Before
    public void setup() {

        Topology topology = createTopology();

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class);

        testDriver = new TopologyTestDriver(topology, props);

        inputTopicLeft = testDriver.createInputTopic(LEFT_TOPIC_NAME, 
Serdes.String().serializer(), Serdes.Integer().serializer());
        inputTopicRight = testDriver.createInputTopic(RIGHT_TOPIC_NAME, 
Serdes.String().serializer(), Serdes.Integer().serializer());

        outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC_NAME, 
Serdes.String().deserializer(), Serdes.Integer().deserializer());

    }

    @After
    public void tearDown() {
        testDriver.close();
    }

    @Test
    public void shouldEmitOnlyOneMessageForKey1(){
        Instant now = Instant.now();
        inputTopicLeft.pipeInput("key1", 12, now);
        inputTopicRight.pipeInput("key1", 13, now.plus(WINDOW_DURATION));

        // send later record to increase stream time & close the window
        inputTopicLeft.pipeInput("other_key", 1212122, 
now.plus(WINDOW_DURATION).plus(GRACE).plusSeconds(10));

        while (! outputTopic.isEmpty()){
            System.out.println(outputTopic.readKeyValue());
        }
    }


}
{code}
Stdout of previous code is
{noformat}
KeyValue(key1, 0)
KeyValue(key1, 1)
{noformat}
However it should be
{noformat}
KeyValue(key1, 1)
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to