vvcephei commented on issue #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-617392150


   I also took a look at the foreign-key join test, which is actually telling 
us something awesome: your feature allows us to drop _unnecessary_ tombstones 
that we'd otherwise send under some conditions.
   
   Anyway, it's complicated, so here's a fix for the test:
   ```diff
   --- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
   +++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
   @@ -48,6 +48,7 @@ import java.util.Properties;
    import java.util.function.Function;
    
    import static java.util.Collections.emptyMap;
   +import static java.util.Collections.singletonMap;
    import static org.apache.kafka.common.utils.Utils.mkEntry;
    import static org.apache.kafka.common.utils.Utils.mkMap;
    import static org.apache.kafka.common.utils.Utils.mkProperties;
   @@ -371,12 +372,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest 
{
    
                // Deleting a non-joining record produces an unnecessary 
tombstone for inner joins, because
                // it's not possible to know whether a result was previously 
emitted.
   +            // HOWEVER, when the final join result is materialized (either 
explicitly or
   +            // implicitly by a subsequent join), we _can_ detect that the 
tombstone is unnecessary and drop it.
                // For the left join, the tombstone is necessary.
                left.pipeInput("lhs1", (String) null);
                {
                    assertThat(
                        outputTopic.readKeyValuesToMap(),
   -                    is(mkMap(mkEntry("lhs1", null)))
   +                    is(leftJoin || !(materialized || rejoin)
   +                           ? mkMap(mkEntry("lhs1", null))
   +                           : emptyMap())
                    );
                    if (materialized) {
                        assertThat(
   @@ -452,12 +457,15 @@ public class KTableKTableForeignKeyJoinIntegrationTest 
{
                }
                // "moving" our subscription to another non-existent FK results 
in an unnecessary tombstone for inner join,
                // since it impossible to know whether the prior FK existed or 
not (and thus whether any results have
   -            // previously been emitted)
   +            // previously been emitted). HOWEVER, when the final join 
result is materialized (either explicitly or
   +            // implicitly by a subsequent join), we _can_ detect that the 
tombstone is unnecessary and drop it.
                // The left join emits a _necessary_ update (since the lhs 
record has actually changed)
                left.pipeInput("lhs1", "lhsValue1|rhs2");
                assertThat(
                    outputTopic.readKeyValuesToMap(),
   -                is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" 
: null)))
   +                is(leftJoin
   +                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)"))
   +                       : (materialized || rejoin) ? emptyMap() : 
singletonMap("lhs1", null))
                );
                if (materialized) {
                    assertThat(
   @@ -469,7 +477,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
                left.pipeInput("lhs1", "lhsValue1|rhs3");
                assertThat(
                    outputTopic.readKeyValuesToMap(),
   -                is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" 
: null)))
   +                is(leftJoin
   +                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)"))
   +                       : (materialized || rejoin) ? emptyMap() : 
singletonMap("lhs1", null))
                );
                if (materialized) {
                    assertThat(
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to