Bruno Cadonna created KAFKA-14862:
-------------------------------------

             Summary: Outer stream-stream join does not output all results with 
multiple input partitions
                 Key: KAFKA-14862
                 URL: https://issues.apache.org/jira/browse/KAFKA-14862
             Project: Kafka
          Issue Type: Bug
          Components: streams
            Reporter: Bruno Cadonna


If I execute the following Streams app once with two input topics each with 1 
partition and then with input topics each with two partitions, I get different 
results.
  
{code:java}
        final KStream<String, String> leftSide = builder.stream(leftSideTopic);
        final KStream<String, String> rightSide = 
builder.stream(rightSideTopic);

        final KStream<String, String> leftAndRight = leftSide.outerJoin(
            rightSide,
            (leftValue, rightValue) ->
                (rightValue == null) ? leftValue + "/NOTPRESENT": leftValue + 
"/" + rightValue,
            JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(20), 
Duration.ofSeconds(10)),
            StreamJoined.with(
                Serdes.String(), /* key */
                Serdes.String(), /* left value */
                Serdes.String()  /* right value */
            )
        );
        leftAndRight.print(Printed.toSysOut());
{code}

To reproduce, produce twice the following batch of records with an interval 
greater than window + grace period (i.e. > 30 seconds) in between the two 
batches:
{code}
(0, 0)
(1, 1)
(2, 2)
(3, 3)
(4, 4)
(5, 5)
(6, 6)
(7, 7)
(8, 8)
(9, 9)
{code}

With input topics with 1 partition I get:
{code}
[KSTREAM-PROCESSVALUES-0000000008]: 0, 0/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 1, 1/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 2, 2/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 3, 3/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 4, 4/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 5, 5/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 6, 6/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 7, 7/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 8, 8/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 9, 9/NOTPRESENT
{code}

With input topics with 2 partitions I get:
{code}
[KSTREAM-PROCESSVALUES-0000000008]: 1, 1/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 3, 3/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 4, 4/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 7, 7/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 8, 8/NOTPRESENT
[KSTREAM-PROCESSVALUES-0000000008]: 9, 9/NOTPRESENT
{code}

I would expect to get the same set of records, maybe in a different order due 
to the partitioning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to