mjsax commented on code in PR #19005: URL: https://github.com/apache/kafka/pull/19005#discussion_r2026065811
########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java: ########## @@ -232,57 +236,134 @@ public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin, left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5); { assertThat( - outputTopic.readKeyValuesToMap(), - is(mkMap( - mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)") + outputTopic.readKeyValuesToList(), + is(List.of( + new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)") )) ); if (rejoin) { assertThat( - rejoinOutputTopic.readKeyValuesToMap(), - is(mkMap( - mkEntry("lhs3", "rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)") + rejoinOutputTopic.readKeyValuesToList(), + is(List.of( + new KeyValue<>("lhs3", "rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)") )) ); } if (materialized) { assertThat( - asMap(store), - is(mkMap( - mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), - mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), - mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)") + makeList(store), Review Comment: as above ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java: ########## @@ -232,57 +236,134 @@ public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin, left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5); { assertThat( - outputTopic.readKeyValuesToMap(), - is(mkMap( - mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)") + outputTopic.readKeyValuesToList(), + is(List.of( + new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)") )) ); if (rejoin) { assertThat( - rejoinOutputTopic.readKeyValuesToMap(), - is(mkMap( - mkEntry("lhs3", "rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)") + rejoinOutputTopic.readKeyValuesToList(), + is(List.of( + new KeyValue<>("lhs3", "rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)") )) ); } if (materialized) { assertThat( - asMap(store), - is(mkMap( - mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), - mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), - mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)") + makeList(store), + is(List.of( + new KeyValue<>("lhs1", "(lhsValue1|rhs1,rhsValue1)"), + new KeyValue<>("lhs2", "(lhsValue2|rhs2,rhsValue2)"), + new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)") )) ); } } + // Now delete one LHS entity such that one delete is propagated down to the output. left.pipeInput("lhs1", null, baseTimestamp + 6); assertThat( - outputTopic.readKeyValuesToMap(), - is(mkMap( - mkEntry("lhs1", null) + outputTopic.readKeyValuesToList(), + is(List.of( + new KeyValue<>("lhs1", null) )) ); if (rejoin) { assertThat( - rejoinOutputTopic.readKeyValuesToMap(), - is(mkMap( - mkEntry("lhs1", null) + rejoinOutputTopic.readKeyValuesToList(), + hasItem( + KeyValue.pair("lhs1", null)) + ); + } + if (materialized) { + assertThat( + makeList(store), + is(List.of( + new KeyValue<>("lhs2", "(lhsValue2|rhs2,rhsValue2)"), + new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)") )) ); } + } + + } + + + @ParameterizedTest + @MethodSource("testCases") + public void doJoinFromLeftThenUpdateFkThenRevertBack(final boolean leftJoin, + final String optimization, Review Comment: nit formatting: should have same indent as first parameter: ``` public void doJoinFromLeftThenUpdateFkThenRevertBack(final boolean leftJoin, final String optimization, ``` Same for other parameters below. ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java: ########## @@ -232,57 +236,134 @@ public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin, left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5); { assertThat( - outputTopic.readKeyValuesToMap(), - is(mkMap( - mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)") + outputTopic.readKeyValuesToList(), + is(List.of( + new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)") )) ); if (rejoin) { assertThat( - rejoinOutputTopic.readKeyValuesToMap(), - is(mkMap( - mkEntry("lhs3", "rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)") + rejoinOutputTopic.readKeyValuesToList(), + is(List.of( + new KeyValue<>("lhs3", "rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)") )) ); } if (materialized) { assertThat( - asMap(store), - is(mkMap( - mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), - mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), - mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)") + makeList(store), + is(List.of( + new KeyValue<>("lhs1", "(lhsValue1|rhs1,rhsValue1)"), + new KeyValue<>("lhs2", "(lhsValue2|rhs2,rhsValue2)"), + new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)") )) ); } } + // Now delete one LHS entity such that one delete is propagated down to the output. left.pipeInput("lhs1", null, baseTimestamp + 6); assertThat( - outputTopic.readKeyValuesToMap(), - is(mkMap( - mkEntry("lhs1", null) + outputTopic.readKeyValuesToList(), + is(List.of( + new KeyValue<>("lhs1", null) )) ); if (rejoin) { assertThat( - rejoinOutputTopic.readKeyValuesToMap(), - is(mkMap( - mkEntry("lhs1", null) + rejoinOutputTopic.readKeyValuesToList(), + hasItem( + KeyValue.pair("lhs1", null)) + ); + } + if (materialized) { + assertThat( + makeList(store), + is(List.of( Review Comment: as above ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java: ########## @@ -203,26 +207,26 @@ public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin, left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4); { - final Map<String, String> expected = mkMap( - mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), - mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)") + final List<KeyValue<String, String>> expected = Arrays.asList( + KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"), + KeyValue.pair("lhs2", "(lhsValue2|rhs2,rhsValue2)") ); assertThat( - outputTopic.readKeyValuesToMap(), + outputTopic.readKeyValuesToList(), is(expected) ); if (rejoin) { assertThat( - rejoinOutputTopic.readKeyValuesToMap(), - is(mkMap( - mkEntry("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"), - mkEntry("lhs2", "rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)") + rejoinOutputTopic.readKeyValuesToList(), + is(asList( + KeyValue.pair("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"), + KeyValue.pair("lhs2", "rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)") )) ); } if (materialized) { assertThat( - asMap(store), + makeList(store), is(expected) Review Comment: This seems to be potentially brittle? The problem is, we don't have any guarantee in what order the `store` would add the record to the list, right? And it could change. So for checking the store content it seems we should keep using `Map`, and only switch to `List` when verifying the result topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org