Hi everyone,I’m new to Apache Paimon, and I’m trying to build a simple
pipeline where data is read from a file source into a Flink table, written
to a Paimon table, and then alerts are pushed to a Kafka topic.

*Paimon Table Definition :*

CREATE TABLE paimon_target_table (
msisdn STRING,
data_usage INT,
status BOOLEAN,
PRIMARY KEY (msisdn) NOT ENFORCED
) WITH (
'connector' = 'paimon',
'merge-engine' = 'aggregation',
'fields.data_usage.aggregate-function' = 'sum',
'fields.status.aggregate-function' = 'last_non_null_value',
'changelog-producer' = 'lookup'
);


*Kafka Sink Table :*
CREATE TEMPORARY TABLE kafka_alert_sink (
msisdn STRING,
data_usage INT,
alert_time TIMESTAMP(3),
PRIMARY KEY (msisdn) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'data_usage_kafka',
'properties.bootstrap.servers' = '10.0.3.6:9094',
'key.format' = 'json',
'value.format' = 'json'
);

*Job to Push Alerts to Kafka :*
INSERT INTO kafka_alert_sink
SELECT
p.msisdn,
p.data_usage,
CURRENT_TIMESTAMP AS alert_time
FROM paimon_target_table /*+ OPTIONS('scan.mode'='latest-full') */ p
WHERE p.data_usage > 80
AND (p.status = FALSE OR p.status IS NULL)
AND p.data_usage IS NOT NULL;



*Test Case 1.* First, I ran the following inserts:
   INSERT INTO paimon_target_table(msisdn, data_usage) VALUES ('125', 75);
   INSERT INTO paimon_target_table(msisdn, data_usage) VALUES ('125', 75);

   Kafka Output:

   {"msisdn":"125","data_usage":150,"alert_time":"2025-10-15 12:51:04.601"}

*2. *Then I updated the record’s status:
    INSERT INTO paimon_target_table(msisdn, status) VALUES ('125', TRUE);

    Kafka Output:

    null


*3.* I inserted another data_usage update:

   insert into paimon_target_table(msisdn,data_usage) values('128',175);

   Kafka Output:

    {"msisdn":"128","data_usage":175,"alert_time":"2025-10-15 12:51:04.601"}

*4.* Finally, I inserted another data_usage update

    insert into paimon_target_table(msisdn,data_usage) values('128',25);

    Kafka Output:

    null
   {"msisdn":"128","data_usage":200,"alert_time":"2025-10-15 12:51:34.616"}


Why are these null messages being produced in the Kafka sink whenever the
Paimon table is updated (especially after changing the status field)?
How can I prevent or filter out these null records from being emitted to
Kafka?

Thanks in Advance

Reply via email to