Hi everyone,I’m new to Apache Paimon, and I’m trying to build a simple
pipeline where data is read from a file source into a Flink table, written
to a Paimon table, and then alerts are pushed to a Kafka topic.
*Paimon Table Definition :*
CREATE TABLE paimon_target_table (
msisdn STRING,
data_usage INT,
status BOOLEAN,
PRIMARY KEY (msisdn) NOT ENFORCED
) WITH (
'connector' = 'paimon',
'merge-engine' = 'aggregation',
'fields.data_usage.aggregate-function' = 'sum',
'fields.status.aggregate-function' = 'last_non_null_value',
'changelog-producer' = 'lookup'
);
*Kafka Sink Table :*
CREATE TEMPORARY TABLE kafka_alert_sink (
msisdn STRING,
data_usage INT,
alert_time TIMESTAMP(3),
PRIMARY KEY (msisdn) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'data_usage_kafka',
'properties.bootstrap.servers' = '10.0.3.6:9094',
'key.format' = 'json',
'value.format' = 'json'
);
*Job to Push Alerts to Kafka :*
INSERT INTO kafka_alert_sink
SELECT
p.msisdn,
p.data_usage,
CURRENT_TIMESTAMP AS alert_time
FROM paimon_target_table /*+ OPTIONS('scan.mode'='latest-full') */ p
WHERE p.data_usage > 80
AND (p.status = FALSE OR p.status IS NULL)
AND p.data_usage IS NOT NULL;
*Test Case 1.* First, I ran the following inserts:
INSERT INTO paimon_target_table(msisdn, data_usage) VALUES ('125', 75);
INSERT INTO paimon_target_table(msisdn, data_usage) VALUES ('125', 75);
Kafka Output:
{"msisdn":"125","data_usage":150,"alert_time":"2025-10-15 12:51:04.601"}
*2. *Then I updated the record’s status:
INSERT INTO paimon_target_table(msisdn, status) VALUES ('125', TRUE);
Kafka Output:
null
*3.* I inserted another data_usage update:
insert into paimon_target_table(msisdn,data_usage) values('128',175);
Kafka Output:
{"msisdn":"128","data_usage":175,"alert_time":"2025-10-15 12:51:04.601"}
*4.* Finally, I inserted another data_usage update
insert into paimon_target_table(msisdn,data_usage) values('128',25);
Kafka Output:
null
{"msisdn":"128","data_usage":200,"alert_time":"2025-10-15 12:51:34.616"}
Why are these null messages being produced in the Kafka sink whenever the
Paimon table is updated (especially after changing the status field)?
How can I prevent or filter out these null records from being emitted to
Kafka?
Thanks in Advance