Flaviu Cicio created FLINK-33989: ------------------------------------ Summary: Insert Statement With Filter Operation Generates Extra Tombstone in Kafka Key: FLINK-33989 URL: https://issues.apache.org/jira/browse/FLINK-33989 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.17.2 Reporter: Flaviu Cicio
Given the following Flink SQL tables: {code:sql} CREATE TABLE input ( id STRING NOT NULL, current_value STRING NOT NULL, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'input', 'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 'properties.group.id' = 'your_group_id', 'value.format' = 'json' ); CREATE TABLE output ( id STRING NOT NULL, current_value STRING NOT NULL, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'output', 'key.format' = 'raw', 'properties.bootstrap.servers' = 'sn-kafka:29092', 'properties.group.id' = 'your_group_id', 'value.format' = 'json' ); {code} And, the following entries are present in the input Kafka topic: {code:json} [ { "id": "1", "current_value": "abc" }, { "id": "1", "current_value": "abcd" } ]{code} If we execute the following statement: {code:sql} INSERT INTO output SELECT id, current_value FROM input; {code} The following entries are published to the output Kafka topic: {code:json} [ { "id": "1", "current_value": "abc" }, { "id": "1", "current_value": "abcd" } ]{code} But, if we execute the following statement: {code:sql} INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); {code} The following entries are published: {code:json} [ { "id": "1", "current_value": "abc" }, null, { "id": "1", "current_value": "abcd" } ]{code} We would expect the result to be the same for both insert statements. As we can see, there is an extra tombstone generated as a result of the second statement. -- This message was sent by Atlassian Jira (v8.20.10#820010)