flink版本:1.14.3   场景如下:
sql:
set table.exec.state.ttl=1 day;
describe t_k_chargeorder;
describe t_k_appointment;
SELECT
ReportTime,
sum( InsertAppointmentCount ) + sum( InsertChargeOrderCount )  kpitotalcount,
sum( InsertActualPriceCount ) InsertActualPriceCount,
sum( InsertAppointmentCount ) InsertAppointmentCount,
sum( InsertChargeOrderCount ) InsertChargeOrderCount,
now() LastUpdatedDT 
from 
(
SELECT
    DATE_FORMAT( recordcreatedtime, 'yyyy-MM-dd' ) ReportTime,
    sum( actualprice ) InsertActualPriceCount,
    0 InsertShortMessageCount,
    0 InsertAppointmentCount,
    0 InsertImageCount,
    0 InsertChargeOrderCount,
    0 InsertPerioExamCount,
    0 InsertMedicalCount,
    0 InsertPatientCount,
    0 InsertGeneralExamCount,
    0 InsertFollowupCount 
FROM
    --effective_chargeorder t 
    (SELECT
        o.recordcreatedtime,
        o.recordcreateduser,
        o.status,
        o._is_delete,
        o.appointmentid,
        o.id,
        o.tenantid,
        o.actualprice,
        o.proc_time,
        t.Name,
        t.IsInactive 
    FROM
        t_k_chargeorder AS o
        INNER JOIN t_dental_tenant FOR SYSTEM_TIME AS OF o.proc_time AS t ON 
o.tenantid = t.Id 
    WHERE
        t.IsInactive = '0' 
        AND o.recordcreateduser > 0 
        AND o.status NOT IN ( '已作废', '未收费', '作废并撤回', '等待支付宝付费', '等待微信付费' ) 
        AND o._is_delete = '0' 
        AND o.appointmentid > 0) t
WHERE
    recordcreatedtime BETWEEN concat( DATE_FORMAT( now() , 'yyyy-MM-dd' ), ' 
00:00:00' ) 
    AND now() 
GROUP BY
    DATE_FORMAT( recordcreatedtime, 'yyyy-MM-dd' )  
) a
group by ReportTime;


DAG图如下:
业务库的新增、修改操作都能监听到,并给出正确结果。
但只要是删除语义,kafka的cdc format能消费到删除数据


但sql计算结果却没有作相应的扣减,如下:
删除后应该由150---->100,但什么也没有发生,感觉是内部算子把这条-D给过滤了
恳请大佬解惑~~





回复