Re: 急 [FLINK-34170] 何时能够修复?

2024-03-14 文章 Benchao Li
FLINK-34170 只是一个UI的展示问题,并不影响实际的运行。

JDBC Connector 维表下推的 filter 不生效问题,已经在 FLINK-33365 中修复了,最新的 JDBC
Connector 版本中已经带上了这个修复,你可以试一下~

casel.chen  于2024年3月15日周五 10:39写道:
>
> 我们最近在使用Flink 1.17.1开发flink sql作业维表关联使用复合主键时遇到FLINK-34170描述一样的问题,请问这个major 
> issue什么时候在哪个版本后能够修复呢?谢谢!
>
>
> select xxx from kafka_table as kt
> left join phoenix_table FORSYSTEM_TIMEASOFphoenix_table.proctime as pt
> on kt.trans_id=pt.trans_id and pt.trans_date = 
> DATE_FORMAT(CURRENT_TIMESTAMP,'MMdd');
>
>
> phoenix表主键是 trans_id + trans_date 
> 复合主键,实际作业运行发现flink只会带trans_id字段对phoenix表进行scan查询,再根据scan查询结果按trans_date字段值进行过滤
>
>
> https://issues.apache.org/jira/browse/FLINK-34170



-- 

Best,
Benchao Li


急 [FLINK-34170] 何时能够修复?

2024-03-14 文章 casel.chen
我们最近在使用Flink 1.17.1开发flink sql作业维表关联使用复合主键时遇到FLINK-34170描述一样的问题,请问这个major 
issue什么时候在哪个版本后能够修复呢?谢谢!


select xxx from kafka_table as kt 
left join phoenix_table FORSYSTEM_TIMEASOFphoenix_table.proctime as pt
on kt.trans_id=pt.trans_id and pt.trans_date = 
DATE_FORMAT(CURRENT_TIMESTAMP,'MMdd');


phoenix表主键是 trans_id + trans_date 
复合主键,实际作业运行发现flink只会带trans_id字段对phoenix表进行scan查询,再根据scan查询结果按trans_date字段值进行过滤


https://issues.apache.org/jira/browse/FLINK-34170

Re: Re:flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-14 文章 Jane Chan
Hi iasiuide,

感谢提问. 先来回答最后一个问题

关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗?
>

Lookup join 的 on condition 会在优化过程中经过一系列改写, 这里只简要对影响 lookup 和 where 的几处进行说明.

1. logical 阶段, FlinkFilterJoinRule 会将 on 条件 split 为针对单边的 (左表/右表) 和针对双边的.
**针对单边的 filter 会被尽量 pushdown 到 join 节点之前** (这意味着有可能会额外生成一个 Filter 节点);
Filter 节点后续如何变化取决于这个 filter 能否 pushdown 到 source, 如果不能, 那么在 physical
阶段它就会变成维表上面 Calc 节点 (denoted by calcOnTemporalTable) 里面的 condition.

2. 在 CommonPhysicalLookupJoin 里解析 allLookupKeys 的时候, 会试图从
calcOnTemporalTable 里把常量条件抽取出来形成最终的 lookup key (也就是 explain plan 里面
lookup=[...] 的内容), 在 explain 时, 只要存在 calcOnTemporalTable, where=[...]
就会被打印出来.

回到具体的 case

为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT
> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==>
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
>

因为 b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')
是针对维表单边的条件且无法被下推. 另外, 这里使用了非确定性函数[1], 请关注结果的正确性.


> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND
> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==>
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
>

此时常量可以被提取出来


> 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND
> (d.data_source = 'ex_agent' OR d.data_source = 'agent')
> 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
>

据我所知 lookup 目前应该还不支持 SARGable

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/determinism/

Best,
Jane

On Fri, Mar 8, 2024 at 11:19 AM iasiuide  wrote:

> 好的,已经贴了sql片段
>
> 在 2024-03-08 11:02:34,"Xuyang"  写道:
> >Hi, 你的图挂了,可以用图床或者直接贴SQL
> >
> >
> >
> >
> >--
> >
> >Best!
> >Xuyang
> >
> >
> >
> >
> >在 2024-03-08 10:54:19,"iasiuide"  写道:
> >
> >
> >
> >
> >
> >下面的sql片段中
> >ods_ymfz_prod_sys_divide_order  为kafka source表
> >dim_ymfz_prod_sys_trans_log   为mysql为表
> >dim_ptfz_ymfz_merchant_info   为mysql为表
> >
> >
> >
> >flink web ui界面的执行计划片段如下:
> >
> > [1]:TableSourceScan(table=[[default_catalog, default_database,
> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time),
> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))),
> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id,
> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
> >+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time,
> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 *
> divide_fee_amt), divide_fee_amt) AS div_fee_amt,
> Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time
> AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND (divide_fee_amt
>  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS
> TIMESTAMP(9)), '-MM-dd')))])
> >   +-
> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date =
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))],
> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts,
> bg_rel_trans_id, pay_type, member_id, mer_name])
> >  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts,
> pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
> > +-
> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source
> = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type,
> member_id, mer_name, pk_id, agent_id, bagent_id])
> >+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts,
> pay_type, member_id, mer_name, agent_id, bagent_id])
> >   +-
> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id],
> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])],
> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id,
> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
> >  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt,
> ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS
> fagent_id0])
> > +-
> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
> joinType=[LeftOuterJoin], async=[false],
> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0],
> where=[(data_source = 'agent')], select=[sys_date, create_time,
> div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0,
> fagent_id0, pk_id, agent_name, bagent_name])
> >  
> >
> >
> >为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT
> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==>
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
> >关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND
> 

flink k8s operator chk config interval bug.inoperative

2024-03-14 文章 kcz
kcz
573693...@qq.com





Re:flink写kafka时,并行度和分区数的设置问题

2024-03-14 文章 熊柱
退订

















在 2024-03-13 15:25:27,"chenyu_opensource"  写道:
>您好:
> flink将数据写入kafka【kafka为sink】,当kafka 
> topic分区数【设置的60】小于设置的并行度【设置的300】时,task是轮询写入这些分区吗,是否会影响写入效率?【是否存在遍历时的耗时情况】。
> 此时,如果扩大topic的分区数【添加至200,或者直接到300】,写入的效率是否会有明显的提升?
>
> 是否有相关的源码可以查看。
>期待回复,祝好,谢谢!
>
>
>