mjsax commented on code in PR #19005:
URL: https://github.com/apache/kafka/pull/19005#discussion_r2026065811


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -232,57 +236,134 @@ public void doJoinFromLeftThenDeleteLeftEntity(final 
boolean leftJoin,
             left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5);
             {
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
-                    is(mkMap(
-                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                    outputTopic.readKeyValuesToList(),
+                    is(List.of(
+                        new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)")
                     ))
                 );
                 if (rejoin) {
                     assertThat(
-                        rejoinOutputTopic.readKeyValuesToMap(),
-                        is(mkMap(
-                            mkEntry("lhs3", 
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
+                        rejoinOutputTopic.readKeyValuesToList(),
+                        is(List.of(
+                            new KeyValue<>("lhs3", 
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
                         ))
                     );
                 }
                 if (materialized) {
                     assertThat(
-                        asMap(store),
-                        is(mkMap(
-                            mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                            mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
-                            mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                        makeList(store),

Review Comment:
   as above



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -232,57 +236,134 @@ public void doJoinFromLeftThenDeleteLeftEntity(final 
boolean leftJoin,
             left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5);
             {
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
-                    is(mkMap(
-                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                    outputTopic.readKeyValuesToList(),
+                    is(List.of(
+                        new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)")
                     ))
                 );
                 if (rejoin) {
                     assertThat(
-                        rejoinOutputTopic.readKeyValuesToMap(),
-                        is(mkMap(
-                            mkEntry("lhs3", 
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
+                        rejoinOutputTopic.readKeyValuesToList(),
+                        is(List.of(
+                            new KeyValue<>("lhs3", 
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
                         ))
                     );
                 }
                 if (materialized) {
                     assertThat(
-                        asMap(store),
-                        is(mkMap(
-                            mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                            mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
-                            mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                        makeList(store),
+                        is(List.of(
+                            new KeyValue<>("lhs1", 
"(lhsValue1|rhs1,rhsValue1)"),
+                            new KeyValue<>("lhs2", 
"(lhsValue2|rhs2,rhsValue2)"),
+                            new KeyValue<>("lhs3", 
"(lhsValue3|rhs1,rhsValue1)")
                         ))
                     );
                 }
             }
+
             // Now delete one LHS entity such that one delete is propagated 
down to the output.
 
             left.pipeInput("lhs1", null, baseTimestamp + 6);
             assertThat(
-                outputTopic.readKeyValuesToMap(),
-                is(mkMap(
-                    mkEntry("lhs1", null)
+                outputTopic.readKeyValuesToList(),
+                is(List.of(
+                    new KeyValue<>("lhs1", null)
                 ))
             );
             if (rejoin) {
                 assertThat(
-                    rejoinOutputTopic.readKeyValuesToMap(),
-                    is(mkMap(
-                        mkEntry("lhs1", null)
+                    rejoinOutputTopic.readKeyValuesToList(),
+                    hasItem(
+                        KeyValue.pair("lhs1", null))
+                );
+            }
+            if (materialized) {
+                assertThat(
+                    makeList(store),
+                    is(List.of(
+                        new KeyValue<>("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
+                        new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)")
                     ))
                 );
             }
+        }
+
+    }
+
+
+    @ParameterizedTest
+    @MethodSource("testCases")
+    public void doJoinFromLeftThenUpdateFkThenRevertBack(final boolean 
leftJoin,
+        final String optimization,

Review Comment:
   nit formatting: should have same indent as first parameter:
   ```
       public void doJoinFromLeftThenUpdateFkThenRevertBack(final boolean 
leftJoin,
                                                            final String 
optimization,
   ```
   
   Same for other parameters below.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -232,57 +236,134 @@ public void doJoinFromLeftThenDeleteLeftEntity(final 
boolean leftJoin,
             left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5);
             {
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
-                    is(mkMap(
-                        mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                    outputTopic.readKeyValuesToList(),
+                    is(List.of(
+                        new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)")
                     ))
                 );
                 if (rejoin) {
                     assertThat(
-                        rejoinOutputTopic.readKeyValuesToMap(),
-                        is(mkMap(
-                            mkEntry("lhs3", 
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
+                        rejoinOutputTopic.readKeyValuesToList(),
+                        is(List.of(
+                            new KeyValue<>("lhs3", 
"rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
                         ))
                     );
                 }
                 if (materialized) {
                     assertThat(
-                        asMap(store),
-                        is(mkMap(
-                            mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                            mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"),
-                            mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
+                        makeList(store),
+                        is(List.of(
+                            new KeyValue<>("lhs1", 
"(lhsValue1|rhs1,rhsValue1)"),
+                            new KeyValue<>("lhs2", 
"(lhsValue2|rhs2,rhsValue2)"),
+                            new KeyValue<>("lhs3", 
"(lhsValue3|rhs1,rhsValue1)")
                         ))
                     );
                 }
             }
+
             // Now delete one LHS entity such that one delete is propagated 
down to the output.
 
             left.pipeInput("lhs1", null, baseTimestamp + 6);
             assertThat(
-                outputTopic.readKeyValuesToMap(),
-                is(mkMap(
-                    mkEntry("lhs1", null)
+                outputTopic.readKeyValuesToList(),
+                is(List.of(
+                    new KeyValue<>("lhs1", null)
                 ))
             );
             if (rejoin) {
                 assertThat(
-                    rejoinOutputTopic.readKeyValuesToMap(),
-                    is(mkMap(
-                        mkEntry("lhs1", null)
+                    rejoinOutputTopic.readKeyValuesToList(),
+                    hasItem(
+                        KeyValue.pair("lhs1", null))
+                );
+            }
+            if (materialized) {
+                assertThat(
+                    makeList(store),
+                    is(List.of(

Review Comment:
   as above



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -203,26 +207,26 @@ public void doJoinFromLeftThenDeleteLeftEntity(final 
boolean leftJoin,
             left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4);
 
             {
-                final Map<String, String> expected = mkMap(
-                    mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                    mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")
+                final List<KeyValue<String, String>> expected = Arrays.asList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                    KeyValue.pair("lhs2", "(lhsValue2|rhs2,rhsValue2)")
                 );
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
+                    outputTopic.readKeyValuesToList(),
                     is(expected)
                 );
                 if (rejoin) {
                     assertThat(
-                        rejoinOutputTopic.readKeyValuesToMap(),
-                        is(mkMap(
-                            mkEntry("lhs1", 
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
-                            mkEntry("lhs2", 
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
+                        rejoinOutputTopic.readKeyValuesToList(),
+                        is(asList(
+                            KeyValue.pair("lhs1", 
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
+                            KeyValue.pair("lhs2", 
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
                         ))
                     );
                 }
                 if (materialized) {
                     assertThat(
-                        asMap(store),
+                        makeList(store),
                         is(expected)

Review Comment:
   This seems to be potentially brittle? The problem is, we don't have any 
guarantee in what order the `store` would add the record to the list, right? 
And it could change. So for checking the store content it seems we should keep 
using `Map`, and only switch to `List` when verifying the result topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to