Hi Joice,

Maybe you can try to print Sink to verify the output of Paimon SQL.

Best,
Jingsong

On Tue, Oct 21, 2025 at 2:11 PM Joice Jacob <[email protected]> wrote:
>
> Hi everyone,I’m new to Apache Paimon, and I’m trying to build a simple 
> pipeline where data is read from a file source into a Flink table, written to 
> a Paimon table, and then alerts are pushed to a Kafka topic.
>
> Paimon Table Definition :
>
> CREATE TABLE paimon_target_table (
> msisdn STRING,
> data_usage INT,
> status BOOLEAN,
> PRIMARY KEY (msisdn) NOT ENFORCED
> ) WITH (
> 'connector' = 'paimon',
> 'merge-engine' = 'aggregation',
> 'fields.data_usage.aggregate-function' = 'sum',
> 'fields.status.aggregate-function' = 'last_non_null_value',
> 'changelog-producer' = 'lookup'
> );
>
> Kafka Sink Table :
>
> CREATE TEMPORARY TABLE kafka_alert_sink (
> msisdn STRING,
> data_usage INT,
> alert_time TIMESTAMP(3),
> PRIMARY KEY (msisdn) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'data_usage_kafka',
> 'properties.bootstrap.servers' = '10.0.3.6:9094',
> 'key.format' = 'json',
> 'value.format' = 'json'
> );
>
> Job to Push Alerts to Kafka :
> INSERT INTO kafka_alert_sink
> SELECT
> p.msisdn,
> p.data_usage,
> CURRENT_TIMESTAMP AS alert_time
> FROM paimon_target_table /*+ OPTIONS('scan.mode'='latest-full') */ p
> WHERE p.data_usage > 80
> AND (p.status = FALSE OR p.status IS NULL)
> AND p.data_usage IS NOT NULL;
>
>
> Test Case
> 1. First, I ran the following inserts:
>    INSERT INTO paimon_target_table(msisdn, data_usage) VALUES ('125', 75);
>    INSERT INTO paimon_target_table(msisdn, data_usage) VALUES ('125', 75);
>
>    Kafka Output:
>
>    {"msisdn":"125","data_usage":150,"alert_time":"2025-10-15 12:51:04.601"}
>
> 2. Then I updated the record’s status:
>     INSERT INTO paimon_target_table(msisdn, status) VALUES ('125', TRUE);
>
>     Kafka Output:
>
>     null
>
>
> 3. I inserted another data_usage update:
>
>    insert into paimon_target_table(msisdn,data_usage) values('128',175);
>
>    Kafka Output:
>
>     {"msisdn":"128","data_usage":175,"alert_time":"2025-10-15 12:51:04.601"}
>
> 4. Finally, I inserted another data_usage update
>
>     insert into paimon_target_table(msisdn,data_usage) values('128',25);
>
>     Kafka Output:
>
>     null
>    {"msisdn":"128","data_usage":200,"alert_time":"2025-10-15 12:51:34.616"}
>
>
> Why are these null messages being produced in the Kafka sink whenever the 
> Paimon table is updated (especially after changing the status field)?
> How can I prevent or filter out these null records from being emitted to 
> Kafka?
>
> Thanks in Advance

Reply via email to