Zhiwen Sun created FLINK-27663: ---------------------------------- Summary: upsert-kafka can't process delete message from upsert-kafka sink Key: FLINK-27663 URL: https://issues.apache.org/jira/browse/FLINK-27663 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.14.4, 1.13.6, 1.15.0 Reporter: Zhiwen Sun
upsert-kafka write DELETE data as Kafka messages with null values (indicate tombstone for the key). But when use upsert-kafka as a source table to consumer kafka messages write by upsert-kafka sink, DELETE messages will be ignored. related sql : {code:java} create table order_system_log( id bigint, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'test_use', 'properties.bootstrap.servers' = 'your broker', 'properties.group.id' = 'your group id', 'value.json.fail-on-missing-field' = 'false', 'value.json.ignore-parse-errors' = 'true', 'key.json.fail-on-missing-field' = 'false', 'key.json.ignore-parse-errors' = 'true', 'key.format' = 'json', 'value.format' = 'json' ); select * from order_system_log ; {code} The problem may be produced by DeserializationSchema#deserialize, this method does not collect data while subclass's deserialize return null. -- This message was sent by Atlassian Jira (v8.20.7#820007)