Hi iasiuide,
方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc
connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。
[1] https://issues.apache.org/jira/browse/FLINK-33365
祝好~
> 2024年3月8日 11:02,iasiuide 写道:
>
>
>
>
> 图片可能加载不出来,下面是图片中的sql片段
> ..
> END AS trans_type,
>
> a.div_fee_amt,
>
> a.ts
>
>FROM
>
> ods_ymfz_prod_sys_divide_order a
>
> LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time
> AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
>
> AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')
>
> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time
> AS c ON b.member_id = c.pk_id
>
> AND c.data_source = 'merch'
>
> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time
> AS d ON c.agent_id = d.pk_id
>
> AND (
>
>d.data_source = 'ex_agent'
>
>OR d.data_source = 'agent'
>
> )
>
> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time
> AS d1 ON d.fagent_id = d1.pk_id
>
> AND d1.data_source = 'agent'
>
>WHERE
>
> a.order_state = '2'
>
> AND a.divide_fee_amt > 0
>
> ) dat
>
> WHERE
>
> trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd')
>
> AND CHAR_LENGTH(member_id) > 1;
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2024-03-08 10:54:19,"iasiuide" 写道:
>
>
>
>
>
> 下面的sql片段中
> ods_ymfz_prod_sys_divide_order 为kafka source表
> dim_ymfz_prod_sys_trans_log 为mysql为表
> dim_ptfz_ymfz_merchant_info 为mysql为表
>
>
>
> flink web ui界面的执行计划片段如下:
>
> [1]:TableSourceScan(table=[[default_catalog, default_database,
> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time),
> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))),
> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id,
> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time,
> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 *
> divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time
> IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts],
> where=[((order_state = '2') AND (divide_fee_amt 0) AND (sys_date =
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))])
> +-
> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date =
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))],
> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts,
> bg_rel_trans_id, pay_type, member_id, mer_name])
> +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type,
> member_id, mer_name], where=[(CHAR_LENGTH(member_id) 1)])
> +-
> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source =
> 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type,
> member_id, mer_name, pk_id, agent_id, bagent_id])
>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts,
> pay_type, member_id, mer_name, agent_id, bagent_id])
> +-
> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id],
> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])],
> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id,
> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
> +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts,
> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS
> fagent_id0])
> +-
> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source
> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type,
> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name,
> bagent_name])
>
>
>
> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT
> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==>
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND
> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==>
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
> 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND
> (d.data_source = 'ex_agent' OR d.data_source = 'agent')
> 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
>