vvcephei commented on issue #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-617392150
I also took a look at the foreign-key join test, which is actually telling us something awesome: your feature allows us to drop _unnecessary_ tombstones that we'd otherwise send under some conditions. Anyway, it's complicated, so here's a fix for the test: ```diff --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -48,6 +48,7 @@ import java.util.Properties; import java.util.function.Function; import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; @@ -371,12 +372,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest { // Deleting a non-joining record produces an unnecessary tombstone for inner joins, because // it's not possible to know whether a result was previously emitted. + // HOWEVER, when the final join result is materialized (either explicitly or + // implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it. // For the left join, the tombstone is necessary. left.pipeInput("lhs1", (String) null); { assertThat( outputTopic.readKeyValuesToMap(), - is(mkMap(mkEntry("lhs1", null))) + is(leftJoin || !(materialized || rejoin) + ? mkMap(mkEntry("lhs1", null)) + : emptyMap()) ); if (materialized) { assertThat( @@ -452,12 +457,15 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } // "moving" our subscription to another non-existent FK results in an unnecessary tombstone for inner join, // since it impossible to know whether the prior FK existed or not (and thus whether any results have - // previously been emitted) + // previously been emitted). HOWEVER, when the final join result is materialized (either explicitly or + // implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it. // The left join emits a _necessary_ update (since the lhs record has actually changed) left.pipeInput("lhs1", "lhsValue1|rhs2"); assertThat( outputTopic.readKeyValuesToMap(), - is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null))) + is(leftJoin + ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) + : (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null)) ); if (materialized) { assertThat( @@ -469,7 +477,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest { left.pipeInput("lhs1", "lhsValue1|rhs3"); assertThat( outputTopic.readKeyValuesToMap(), - is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null))) + is(leftJoin + ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) + : (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null)) ); if (materialized) { assertThat( ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org