[ 
https://issues.apache.org/jira/browse/KAFKA-18713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tuna b updated KAFKA-18713:
---------------------------
    Description: 
There seems to be an issue when performing a left-join, the latest value with 
of the join does not contain the right side of the table.
{code:java}
var builder = new StreamsBuilder();

KTable<String, Department> departments = builder
    .table("departments",
        Materialized.<String, 
Department>as(Stores.persistentKeyValueStore("departments"))
            .withKeySerde(Serdes.String())
            .withValueSerde(CustomJsonSerde.json(Department.class)));

KTable<String, Person> persons = builder
    .table("persons",
        Materialized.<String, 
Person>as(Stores.persistentKeyValueStore("persons"))
            .withKeySerde(Serdes.String())
            .withValueSerde(CustomJsonSerde.json(Person.class)));

KTable<String, Person> joined = persons
    .leftJoin(departments, Person::getDepartmentId, (person, department) -> 
person.toBuilder()
            .department(department)
            .build(),
        TableJoined.as("my-joiner"),
        Materialized.<String, 
Person>as(Stores.persistentKeyValueStore("joined-results"))
            .withKeySerde(Serdes.String())
            .withValueSerde(CustomJsonSerde.json(Person.class)));

joined
    .toStream()
    .to("joined-results", Produced.with(Serdes.String(), 
CustomJsonSerde.json(Person.class))); {code}
How to reproduce:

Create two topics persons and departments, each with 10 partitions. 
Pre-populate the departments topic with 2 departments.

 
Observation: * When i initially produce a Person {{p-1}} with a FK 
{{{}dep-1{}}}, the join works .
 * 
 ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}}
 * When i change the FK to {{{}dep-2{}}}, the join updates .
 ** output is an EnrichedResult with person {{p-1 }}and department {{dep-2}}
 * When i change the FK back to {{{}dep-1{}}}, the join fails .
 ** output is an EnrichedResult with person {{p-1}} *but no department*
 * However, if I reproduce the same event ({{{}p-1{}}} with {{{}dep-1{}}}), the 
join works again .
 ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}}

Also, even when you are not setting back to a previous FK, there can still be 
an issue with the left join. Changing an FK means insert + delete operations, 
but sometimes the result of the delete is emitted after the result of the 
insert. 

How to reproduce:
Create a departments topic and pre-populate it with 5 departments (dep-1 to 
dep-5). Create a persons topic and create person  p-1 with FK dep-1. Send an 
update to the persons topic by changing the FK to dep-2 and repeat this step 
until dep-5. Now you will see that the latest emitted value of the person does 
not contain a department. 

How to reproduce: 

  was:
There seems to be an issue when performing a left-join, the latest value with 
of the join is not emitted.
{code:java}
var builder = new StreamsBuilder();

KTable<String, Department> departments = builder
    .table("departments",
        Materialized.<String, 
Department>as(Stores.persistentKeyValueStore("departments"))
            .withKeySerde(Serdes.String())
            .withValueSerde(CustomJsonSerde.json(Department.class)));

KTable<String, Person> persons = builder
    .table("persons",
        Materialized.<String, 
Person>as(Stores.persistentKeyValueStore("persons"))
            .withKeySerde(Serdes.String())
            .withValueSerde(CustomJsonSerde.json(Person.class)));

KTable<String, Person> joined = persons
    .leftJoin(departments, Person::getDepartmentId, (person, department) -> 
person.toBuilder()
            .department(department)
            .build(),
        TableJoined.as("my-joiner"),
        Materialized.<String, 
Person>as(Stores.persistentKeyValueStore("joined-results"))
            .withKeySerde(Serdes.String())
            .withValueSerde(CustomJsonSerde.json(Person.class)));

joined
    .toStream()
    .to("joined-results", Produced.with(Serdes.String(), 
CustomJsonSerde.json(Person.class))); {code}
How to reproduce:

Create two topics persons and departments, each with 10 partitions. 
Pre-populate the departments topic with 2 departments.

 
Observation: * When i initially produce a Person {{p-1}} with a FK 
{{{}dep-1{}}}, the join works .
 ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}}
 * When i change the FK to {{{}dep-2{}}}, the join updates .
 ** output is an EnrichedResult with person {{p-1 }}and department {{dep-2}}
 * When i change the FK back to {{{}dep-1{}}}, the join fails .
 ** output is an EnrichedResult with person {{p-1}} *but no department*
 * However, if I reproduce the same event ({{{}p-1{}}} with {{{}dep-1{}}}), the 
join works again .
 ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}}


Also, even when you are not setting back to a previous FK, there can still be 
an issue with the left join. Changing an FK means insert + delete operations, 
but sometimes the result of the delete is emitted after the result of the 
insert. 

How to reproduce:
Create a departments topic and pre-populate it with 5 departments (dep-1 to 
dep-5). Create a persons topic and create person  p-1 with FK dep-1. Send an 
update to the persons topic by changing the FK to dep-2 and repeat this step 
until dep-5. Now you will see that the latest emitted value of the person does 
not contain a department. 

How to reproduce: 


> Kafka Streams Left-Join not always emitting the last value
> ----------------------------------------------------------
>
>                 Key: KAFKA-18713
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18713
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.8.0
>            Reporter: tuna b
>            Priority: Major
>
> There seems to be an issue when performing a left-join, the latest value with 
> of the join does not contain the right side of the table.
> {code:java}
> var builder = new StreamsBuilder();
> KTable<String, Department> departments = builder
>     .table("departments",
>         Materialized.<String, 
> Department>as(Stores.persistentKeyValueStore("departments"))
>             .withKeySerde(Serdes.String())
>             .withValueSerde(CustomJsonSerde.json(Department.class)));
> KTable<String, Person> persons = builder
>     .table("persons",
>         Materialized.<String, 
> Person>as(Stores.persistentKeyValueStore("persons"))
>             .withKeySerde(Serdes.String())
>             .withValueSerde(CustomJsonSerde.json(Person.class)));
> KTable<String, Person> joined = persons
>     .leftJoin(departments, Person::getDepartmentId, (person, department) -> 
> person.toBuilder()
>             .department(department)
>             .build(),
>         TableJoined.as("my-joiner"),
>         Materialized.<String, 
> Person>as(Stores.persistentKeyValueStore("joined-results"))
>             .withKeySerde(Serdes.String())
>             .withValueSerde(CustomJsonSerde.json(Person.class)));
> joined
>     .toStream()
>     .to("joined-results", Produced.with(Serdes.String(), 
> CustomJsonSerde.json(Person.class))); {code}
> How to reproduce:
> Create two topics persons and departments, each with 10 partitions. 
> Pre-populate the departments topic with 2 departments.
>  
> Observation: * When i initially produce a Person {{p-1}} with a FK 
> {{{}dep-1{}}}, the join works .
>  * 
>  ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}}
>  * When i change the FK to {{{}dep-2{}}}, the join updates .
>  ** output is an EnrichedResult with person {{p-1 }}and department {{dep-2}}
>  * When i change the FK back to {{{}dep-1{}}}, the join fails .
>  ** output is an EnrichedResult with person {{p-1}} *but no department*
>  * However, if I reproduce the same event ({{{}p-1{}}} with {{{}dep-1{}}}), 
> the join works again .
>  ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}}
> Also, even when you are not setting back to a previous FK, there can still be 
> an issue with the left join. Changing an FK means insert + delete operations, 
> but sometimes the result of the delete is emitted after the result of the 
> insert. 
> How to reproduce:
> Create a departments topic and pre-populate it with 5 departments (dep-1 to 
> dep-5). Create a persons topic and create person  p-1 with FK dep-1. Send an 
> update to the persons topic by changing the FK to dep-2 and repeat this step 
> until dep-5. Now you will see that the latest emitted value of the person 
> does not contain a department. 
> How to reproduce: 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to