Hi Joice, Maybe you can try to print Sink to verify the output of Paimon SQL.
Best, Jingsong On Tue, Oct 21, 2025 at 2:11 PM Joice Jacob <[email protected]> wrote: > > Hi everyone,I’m new to Apache Paimon, and I’m trying to build a simple > pipeline where data is read from a file source into a Flink table, written to > a Paimon table, and then alerts are pushed to a Kafka topic. > > Paimon Table Definition : > > CREATE TABLE paimon_target_table ( > msisdn STRING, > data_usage INT, > status BOOLEAN, > PRIMARY KEY (msisdn) NOT ENFORCED > ) WITH ( > 'connector' = 'paimon', > 'merge-engine' = 'aggregation', > 'fields.data_usage.aggregate-function' = 'sum', > 'fields.status.aggregate-function' = 'last_non_null_value', > 'changelog-producer' = 'lookup' > ); > > Kafka Sink Table : > > CREATE TEMPORARY TABLE kafka_alert_sink ( > msisdn STRING, > data_usage INT, > alert_time TIMESTAMP(3), > PRIMARY KEY (msisdn) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'data_usage_kafka', > 'properties.bootstrap.servers' = '10.0.3.6:9094', > 'key.format' = 'json', > 'value.format' = 'json' > ); > > Job to Push Alerts to Kafka : > INSERT INTO kafka_alert_sink > SELECT > p.msisdn, > p.data_usage, > CURRENT_TIMESTAMP AS alert_time > FROM paimon_target_table /*+ OPTIONS('scan.mode'='latest-full') */ p > WHERE p.data_usage > 80 > AND (p.status = FALSE OR p.status IS NULL) > AND p.data_usage IS NOT NULL; > > > Test Case > 1. First, I ran the following inserts: > INSERT INTO paimon_target_table(msisdn, data_usage) VALUES ('125', 75); > INSERT INTO paimon_target_table(msisdn, data_usage) VALUES ('125', 75); > > Kafka Output: > > {"msisdn":"125","data_usage":150,"alert_time":"2025-10-15 12:51:04.601"} > > 2. Then I updated the record’s status: > INSERT INTO paimon_target_table(msisdn, status) VALUES ('125', TRUE); > > Kafka Output: > > null > > > 3. I inserted another data_usage update: > > insert into paimon_target_table(msisdn,data_usage) values('128',175); > > Kafka Output: > > {"msisdn":"128","data_usage":175,"alert_time":"2025-10-15 12:51:04.601"} > > 4. Finally, I inserted another data_usage update > > insert into paimon_target_table(msisdn,data_usage) values('128',25); > > Kafka Output: > > null > {"msisdn":"128","data_usage":200,"alert_time":"2025-10-15 12:51:34.616"} > > > Why are these null messages being produced in the Kafka sink whenever the > Paimon table is updated (especially after changing the status field)? > How can I prevent or filter out these null records from being emitted to > Kafka? > > Thanks in Advance
