Flink SQL消费kafka topic有办法限速么?

2024-05-27 文章 casel.chen
Flink SQL消费kafka topic有办法限速么?场景是消费kafka 
topic数据写入下游mongodb,在业务高峰期时下游mongodb写入压力大,希望能够限速消费kafka,请问要如何实现?

Re: Flink sql retract to append

2024-04-30 文章 Zijun Zhao
以处理时间为升序,处理结果肯定不会出现回撤的,因为往后的时间不会比当前时间小了,你可以在试试这个去重

On Tue, Apr 30, 2024 at 3:35 PM 焦童  wrote:

> 谢谢你的建议  但是top-1也会产生回撤信息
>
> > 2024年4月30日 15:27,ha.fen...@aisino.com 写道:
> >
> > 可以参考这个
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/table/sql/queries/deduplication/
> > 1.11版本不知道是不是支持
> >
> > From: 焦童
> > Date: 2024-04-30 11:25
> > To: user-zh
> > Subject: Flink sql retract to append
> > Hello ,
> > 我使用Flink 1.11 版本 sql  进行数据去重(通过 group by
> 形式)但是这会产生回撤流,下游存储不支持回撤流信息仅支持append,在DataStream
> 中我可以通过状态进行去重,但是在sql中如何做到去重且不产生回撤流呢。谢谢各位
>
>


Re: Flink sql retract to append

2024-04-30 文章 焦童
谢谢你的建议  但是top-1也会产生回撤信息  

> 2024年4月30日 15:27,ha.fen...@aisino.com 写道:
> 
> 可以参考这个
> https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/table/sql/queries/deduplication/
> 1.11版本不知道是不是支持
> 
> From: 焦童
> Date: 2024-04-30 11:25
> To: user-zh
> Subject: Flink sql retract to append
> Hello ,
> 我使用Flink 1.11 版本 sql  进行数据去重(通过 group by 
> 形式)但是这会产生回撤流,下游存储不支持回撤流信息仅支持append,在DataStream 
> 中我可以通过状态进行去重,但是在sql中如何做到去重且不产生回撤流呢。谢谢各位



Flink sql retract to append

2024-04-29 文章 焦童
Hello ,
 我使用Flink 1.11 版本 sql  进行数据去重(通过 group by 
形式)但是这会产生回撤流,下游存储不支持回撤流信息仅支持append,在DataStream 
中我可以通过状态进行去重,但是在sql中如何做到去重且不产生回撤流呢。谢谢各位

Re:Re: flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 iasiuide
你好,我们用的是1.13.2和1.15.4版本的,看了下flink ui,这两种版本针对下面sql片段的lookup执行计划中的关联维表条件是一样的


在 2024-03-08 11:08:51,"Yu Chen"  写道:
>Hi iasiuide,
>方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc 
>connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。
>
>[1] https://issues.apache.org/jira/browse/FLINK-33365
>
>祝好~
>
>> 2024年3月8日 11:02,iasiuide  写道:
>> 
>> 
>> 
>> 
>> 图片可能加载不出来,下面是图片中的sql片段 
>> ..
>> END AS trans_type,
>> 
>>  a.div_fee_amt,
>> 
>>  a.ts
>> 
>>FROM
>> 
>>  ods_ymfz_prod_sys_divide_order a
>> 
>>  LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time 
>> AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
>> 
>>  AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')
>> 
>>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
>> AS c ON b.member_id = c.pk_id
>> 
>>  AND c.data_source = 'merch'
>> 
>>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
>> AS d ON c.agent_id = d.pk_id
>> 
>>  AND (
>> 
>>d.data_source = 'ex_agent'
>> 
>>OR d.data_source = 'agent'
>> 
>>  ) 
>> 
>>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
>> AS d1 ON d.fagent_id = d1.pk_id
>> 
>>  AND d1.data_source = 'agent'
>> 
>>WHERE 
>> 
>>  a.order_state = '2' 
>> 
>>  AND a.divide_fee_amt > 0
>> 
>>  ) dat
>> 
>> WHERE
>> 
>>  trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd')
>> 
>>  AND CHAR_LENGTH(member_id) > 1;
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2024-03-08 10:54:19,"iasiuide"  写道:
>> 
>> 
>> 
>> 
>> 
>> 下面的sql片段中
>> ods_ymfz_prod_sys_divide_order  为kafka source表
>> dim_ymfz_prod_sys_trans_log   为mysql为表
>> dim_ptfz_ymfz_merchant_info   为mysql为表
>> 
>> 
>> 
>> flink web ui界面的执行计划片段如下:
>> 
>> [1]:TableSourceScan(table=[[default_catalog, default_database, 
>> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
>> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
>> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
>> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
>> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, 
>> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * 
>> divide_fee_amt), divide_fee_amt) AS div_fee_amt, 
>> Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time 
>> AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND (divide_fee_amt 
>>  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS 
>> TIMESTAMP(9)), '-MM-dd')))])
>>   +- 
>> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
>>  joinType=[LeftOuterJoin], async=[false], 
>> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
>> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
>> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
>> bg_rel_trans_id, pay_type, member_id, mer_name])
>>  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
>> member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
>> +- 
>> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>>  joinType=[LeftOuterJoin], async=[false], 
>> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source 
>> = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
>> member_id, mer_name, pk_id, agent_id, bagent_id])
>>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
>> pay_type, member_id, mer_name, agent_id, bagent_id])
>>   +- 
>> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>>  joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
>> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
>> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, 
>> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
>>  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
>> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS 
>> fagent_id0])
>> +- 
>> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>>  joinType=[LeftOuterJoin], async=[false], 
>> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source 
>> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
>> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, 
>> bagent_name])
>>  
>> 
>> 
>> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
>> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
>> lookup=[bg_rel_trans_id=bg_rel_trans_id],
>> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
>> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
>> 

Re: flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 Yu Chen
Hi iasiuide,
方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc 
connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。

[1] https://issues.apache.org/jira/browse/FLINK-33365

祝好~

> 2024年3月8日 11:02,iasiuide  写道:
> 
> 
> 
> 
> 图片可能加载不出来,下面是图片中的sql片段 
> ..
> END AS trans_type,
> 
>  a.div_fee_amt,
> 
>  a.ts
> 
>FROM
> 
>  ods_ymfz_prod_sys_divide_order a
> 
>  LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time 
> AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
> 
>  AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS c ON b.member_id = c.pk_id
> 
>  AND c.data_source = 'merch'
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS d ON c.agent_id = d.pk_id
> 
>  AND (
> 
>d.data_source = 'ex_agent'
> 
>OR d.data_source = 'agent'
> 
>  ) 
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS d1 ON d.fagent_id = d1.pk_id
> 
>  AND d1.data_source = 'agent'
> 
>WHERE 
> 
>  a.order_state = '2' 
> 
>  AND a.divide_fee_amt > 0
> 
>  ) dat
> 
> WHERE
> 
>  trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd')
> 
>  AND CHAR_LENGTH(member_id) > 1;
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2024-03-08 10:54:19,"iasiuide"  写道:
> 
> 
> 
> 
> 
> 下面的sql片段中
> ods_ymfz_prod_sys_divide_order  为kafka source表
> dim_ymfz_prod_sys_trans_log   为mysql为表
> dim_ptfz_ymfz_merchant_info   为mysql为表
> 
> 
> 
> flink web ui界面的执行计划片段如下:
> 
> [1]:TableSourceScan(table=[[default_catalog, default_database, 
> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, 
> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * 
> divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time 
> IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], 
> where=[((order_state = '2') AND (divide_fee_amt  0) AND (sys_date = 
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))])
>   +- 
> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
> bg_rel_trans_id, pay_type, member_id, mer_name])
>  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
> +- 
> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 
> 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, pk_id, agent_id, bagent_id])
>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, agent_id, bagent_id])
>   +- 
> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, 
> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
>  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS 
> fagent_id0])
> +- 
> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source 
> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, 
> bagent_name])
>  
> 
> 
> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
> 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
> (d.data_source = 'ex_agent' OR d.data_source = 'agent') 
> 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
> 

flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 iasiuide




下面的sql片段中
ods_ymfz_prod_sys_divide_order  为kafka source表
dim_ymfz_prod_sys_trans_log   为mysql为表
dim_ptfz_ymfz_merchant_info   为mysql为表



flink web ui界面的执行计划片段如下:

 [1]:TableSourceScan(table=[[default_catalog, default_database, 
ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, 
Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS 
div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, 
CAST(create_time AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND 
(divide_fee_amt  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS 
TIMESTAMP(9)), '-MM-dd')))])
   +- 
[3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
 joinType=[LeftOuterJoin], async=[false], 
lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
bg_rel_trans_id, pay_type, member_id, mer_name])
  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
 +- 
[5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', 
pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, 
bagent_id])
+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, agent_id, bagent_id])
   +- 
[7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, 
agent_id, bagent_id, pk_id, bagent_id, fagent_id])
  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
 +- 
[9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', 
pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, 
bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
  


为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
(CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
lookup=[bg_rel_trans_id=bg_rel_trans_id],
关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
(d.data_source = 'ex_agent' OR d.data_source = 'agent') 
中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。







Re: flink sql作业如何统计端到端延迟

2024-03-04 文章 Shawn Huang
Flink有一个端到端延迟的指标,可以参考以下文档[1],看看是否有帮助。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/ops/metrics/#end-to-end-latency-tracking

Best,
Shawn Huang


casel.chen  于2024年2月21日周三 15:31写道:

> flink sql作业从kafka消费mysql过来的canal
> json消息,经过复杂处理后写入doris,请问如何统计doris表记录的端到端时延?mysql表有update_time字段代表业务更新记录时间。
> doris系统可以在表schema新增一个更新时间列ingest_time,所以在doris表上可以通过ingest_time -
> update_time算出端到端时延,但这种方法只能离线统计,有没有实时统计以方便实时监控的方法?
>
> 查了SinkFunction类的invoke方法虽然带有Context类型参数可以获取当前处理时间和事件时间,但因为大部分sink都是采用攒微批方式再批量写入的,所以这两个时间直接相减得到的时间差并不能代表真实落库的时延。有没有精确获取时延的方法呢?


Re:Re:Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-21 文章 Xuyang
Hi, 
> 那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
对,具体可以参考下这个内部实现的算子[1]


> 新的sink 
> v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
>  - context.timestamp()得到sink延迟呢?
应该是可以的,就是可能因为各tm的机器时间会有略微差异的情况,不会特别准,但是应该也够用了。


[1] 
https://github.com/apache/flink/blob/e7e973e212d0ca04855af3036fc5b73888b8e0e5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java#L314




--

Best!
Xuyang





在 2024-02-21 15:17:49,"casel.chen"  写道:
>感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
>我看新的sink 
>v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
> - context.timestamp()得到sink延迟呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2024-02-21 09:41:37,"Xuyang"  写道:
>>Hi, chen. 
>>可以试一下在sink function的invoke函数中使用:
>>
>>
>>@Override
>>public void invoke(RowData row, Context context) throws Exception {
>>context.currentProcessingTime(); 
>>context.currentWatermark(); 
>>...
>>}
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>Best!
>>Xuyang
>>
>>
>>
>>
>>
>>在 2024-02-20 19:38:44,"Feng Jin"  写道:
>>>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>>>
>>>Best,
>>>Feng
>>>
>>>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>>>
>>>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>>>
>>>>
>>>> public class XxxSinkFunction extends RichSinkFunction implements
>>>> CheckpointedFunction, CheckpointListener {
>>>>
>>>>
>>>> @Override
>>>> public synchronized void invoke(RowData rowData, Context context)
>>>> throws IOException {
>>>>//  这里想从rowData中获取event time和watermark值,如何实现呢?
>>>> }
>>>> }
>>>>
>>>>
>>>> 例如source table如下定义
>>>>
>>>>
>>>> CREATE TEMPORARY TABLE source_table(
>>>>   username varchar,
>>>>   click_url varchar,
>>>>   eventtime varchar,
>>>>
>>>>   ts AS TO_TIMESTAMP(eventtime),
>>>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>>>> ) with (
>>>>   'connector'='kafka',
>>>>   ...
>>>>
>>>> );
>>>>
>>>>
>>>> CREATE TEMPORARY TABLE sink_table(
>>>>   username varchar,
>>>>   click_url varchar,
>>>>   eventtime varchar
>>>> ) with (
>>>>   'connector'='xxx',
>>>>   ...
>>>> );
>>>> insert into sink_table select username,click_url,eventtime from
>>>> source_table;


flink sql作业如何统计端到端延迟

2024-02-20 文章 casel.chen
flink sql作业从kafka消费mysql过来的canal 
json消息,经过复杂处理后写入doris,请问如何统计doris表记录的端到端时延?mysql表有update_time字段代表业务更新记录时间。
doris系统可以在表schema新增一个更新时间列ingest_time,所以在doris表上可以通过ingest_time - 
update_time算出端到端时延,但这种方法只能离线统计,有没有实时统计以方便实时监控的方法?
查了SinkFunction类的invoke方法虽然带有Context类型参数可以获取当前处理时间和事件时间,但因为大部分sink都是采用攒微批方式再批量写入的,所以这两个时间直接相减得到的时间差并不能代表真实落库的时延。有没有精确获取时延的方法呢?

Re:Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 casel.chen
感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
我看新的sink 
v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
 - context.timestamp()得到sink延迟呢?














在 2024-02-21 09:41:37,"Xuyang"  写道:
>Hi, chen. 
>可以试一下在sink function的invoke函数中使用:
>
>
>@Override
>public void invoke(RowData row, Context context) throws Exception {
>context.currentProcessingTime(); 
>context.currentWatermark(); 
>...
>}
>
>
>
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2024-02-20 19:38:44,"Feng Jin"  写道:
>>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>>
>>Best,
>>Feng
>>
>>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>>
>>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>>
>>>
>>> public class XxxSinkFunction extends RichSinkFunction implements
>>> CheckpointedFunction, CheckpointListener {
>>>
>>>
>>> @Override
>>> public synchronized void invoke(RowData rowData, Context context)
>>> throws IOException {
>>>//  这里想从rowData中获取event time和watermark值,如何实现呢?
>>> }
>>> }
>>>
>>>
>>> 例如source table如下定义
>>>
>>>
>>> CREATE TEMPORARY TABLE source_table(
>>>   username varchar,
>>>   click_url varchar,
>>>   eventtime varchar,
>>>
>>>   ts AS TO_TIMESTAMP(eventtime),
>>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>>> ) with (
>>>   'connector'='kafka',
>>>   ...
>>>
>>> );
>>>
>>>
>>> CREATE TEMPORARY TABLE sink_table(
>>>   username varchar,
>>>   click_url varchar,
>>>   eventtime varchar
>>> ) with (
>>>   'connector'='xxx',
>>>   ...
>>> );
>>> insert into sink_table select username,click_url,eventtime from
>>> source_table;


Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 Xuyang
Hi, chen. 
可以试一下在sink function的invoke函数中使用:


@Override
public void invoke(RowData row, Context context) throws Exception {
context.currentProcessingTime(); 
context.currentWatermark(); 
...
}







--

Best!
Xuyang





在 2024-02-20 19:38:44,"Feng Jin"  写道:
>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>
>Best,
>Feng
>
>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>
>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>
>>
>> public class XxxSinkFunction extends RichSinkFunction implements
>> CheckpointedFunction, CheckpointListener {
>>
>>
>> @Override
>> public synchronized void invoke(RowData rowData, Context context)
>> throws IOException {
>>//  这里想从rowData中获取event time和watermark值,如何实现呢?
>> }
>> }
>>
>>
>> 例如source table如下定义
>>
>>
>> CREATE TEMPORARY TABLE source_table(
>>   username varchar,
>>   click_url varchar,
>>   eventtime varchar,
>>
>>   ts AS TO_TIMESTAMP(eventtime),
>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>> ) with (
>>   'connector'='kafka',
>>   ...
>>
>> );
>>
>>
>> CREATE TEMPORARY TABLE sink_table(
>>   username varchar,
>>   click_url varchar,
>>   eventtime varchar
>> ) with (
>>   'connector'='xxx',
>>   ...
>> );
>> insert into sink_table select username,click_url,eventtime from
>> source_table;


Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 Feng Jin
我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.

Best,
Feng

On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:

> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>
>
> public class XxxSinkFunction extends RichSinkFunction implements
> CheckpointedFunction, CheckpointListener {
>
>
> @Override
> public synchronized void invoke(RowData rowData, Context context)
> throws IOException {
>//  这里想从rowData中获取event time和watermark值,如何实现呢?
> }
> }
>
>
> 例如source table如下定义
>
>
> CREATE TEMPORARY TABLE source_table(
>   username varchar,
>   click_url varchar,
>   eventtime varchar,
>
>   ts AS TO_TIMESTAMP(eventtime),
>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
> ) with (
>   'connector'='kafka',
>   ...
>
> );
>
>
> CREATE TEMPORARY TABLE sink_table(
>   username varchar,
>   click_url varchar,
>   eventtime varchar
> ) with (
>   'connector'='xxx',
>   ...
> );
> insert into sink_table select username,click_url,eventtime from
> source_table;


flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 casel.chen
请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?


public class XxxSinkFunction extends RichSinkFunction implements 
CheckpointedFunction, CheckpointListener {


@Override
public synchronized void invoke(RowData rowData, Context context) throws 
IOException {
   //  这里想从rowData中获取event time和watermark值,如何实现呢?
}
}


例如source table如下定义


CREATE TEMPORARY TABLE source_table(
  username varchar,
  click_url varchar,
  eventtime varchar,
  ts AS TO_TIMESTAMP(eventtime),
  WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
) with (
  'connector'='kafka',
  ...

);


CREATE TEMPORARY TABLE sink_table(
  username varchar,
  click_url varchar,
  eventtime varchar
) with (
  'connector'='xxx',
  ...
);
insert into sink_table select username,click_url,eventtime from source_table;

RE: Flink SQL Windowing TVFs

2023-12-28 文章 Jiabao Sun
Hi,

在 1.14.0 版本中,CUMULATE 函数是需要用在GROUP BY聚合场景下的[1]。
部署到生产的 SQL 是否包含了 GROUP BY 表达式?
本地测试的Flink版本是不是1.14.0?

Best,
Jiabao

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate



On 2023/12/29 04:57:09 "jiaot...@mail.jj.cn" wrote:
> Hi,
>  我在使用1.14.0版本Flink,本地测试了CUMULATE(TABLE kafka, DESCRIPTOR(rowtime), 
> INTERVAL '60' SECOND, INTERVAL '1' DAYS)方法可以正常运行,但是当我将其部署到线上环境报了如下错误:
>  org.apache.flink.client.program.ProgramInvocationException: The main 
> method caused an error: Currently Flink doesn't support individual window 
> table-valued function CUMULATE(time_col=[rowtime], max_size=[8640 ms], 
> step=[1 min]).
>  Please use window table-valued function with the following computations:
>  1. aggregate using window_start and window_end as group keys.
>  2. topN using window_start and window_end as partition key.
>  3. join with join condition contains window starts equality of input 
> tables and window ends equality of input tables.
>  请问这是因为线上包版本导致的吗,如果是版本问题,具体是哪一个包呢
>  非常感谢
> 

Flink SQL作业配置'table.exec.sink.upsert-materialize'参数会影响TIMESTAMP类型精度?

2023-11-30 文章 casel.chen
线上有一个flink sql作业,创建和更新时间列使用的是 TIMESTAMP(3) 
类型,没有配置'table.exec.sink.upsert-materialize'参数时是正常时间写入的`-MM-dd 
HH:mm:ss.SSS`格式,
然后添加了'table.exec.sink.upsert-materialize'='NONE'参数后,输出的时间格式变成了 `-MM-dd 
HH:mm:ss.SS`。数据类型变成了TIMESTAMP(6),请问这是已知的issue么?


-U[2023-11-29T21:11:02.327, 2023-11-29, 17332097, 20231129, 
YYHK6509100016607, S, 17332097, 1006.50, 30, 04, 1, 
23112921110248786000, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
+U[2023-11-30T12:43:04.676821, 2023-11-30, 000143554006, 20231130, 
23113012430450887882, F, 000143718775, 10.00, 44, 07, 2, 
23113012430450887895, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
+I[2023-11-29T17:37:01.556478, 2023-11-29, 000141180318, 20231129, 
2f1edf1e3337642d, P, 000141538175, 246.00, 999, 01, 2, 
23112917370147645164, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
-U[2023-11-25T16:02:45.145392, 2023-11-25, 000141288683, 20231125, 
2023112516024553495256400285, P, , 1200.00, 81, 02, 1, 23112516024525664244, 
2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
+U[2023-11-29T21:11:02.327, 2023-11-29, 17332097, 20231129, 
YYHK6509100016607, S, 17332097, 1006.50, 30, 04, 1, 
23112921110248786000, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
+U[2023-11-25T16:02:45.145392, 2023-11-25, 000141288683, 20231125, 
2023112516024553495256400285, F, 000141586078, 1200.00, 81, 02, 1, 
23112516024525664244, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
-U[2023-11-29T21:11:02.327, 2023-11-29, 17332097, 20231129, 
YYHK6509100016607, S, 17332097, 1006.50, 30, 04, 1, 
23112921110248786000, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
-U[2023-11-28T14:53:21.349043, 2023-11-28, 000137842973, 20231128, 
HFPWALLET23112814532140921335, P, 000142774221, 62.98, 86, 06, 4, 
538014532140921373, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]

Re:回复: flink sql如何实现json字符数据解析?

2023-11-29 文章 casel.chen
社区Flink自带的那些json函数都没有解析一串json string返回一行或多行ROW的

















在 2023-11-23 15:24:33,"junjie.m...@goupwith.com"  写道:
>可以看下JSON函数
>https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/functions/systemfunctions/#json-functions
>
>
>
>Junjie.M
> 
>发件人: casel.chen
>发送时间: 2023-11-22 20:54
>收件人: user-zh@flink.apache.org
>主题: flink sql如何实现json字符数据解析?
>输入:
> 
>{
> 
>  "uuid":"",
> 
>  "body_data": 
> "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
> 
>}
> 
> 
> 
> 
>输出:
> 
>[
> 
>  {
> 
>"uuid": "",
> 
>"body_data: null,
> 
>"body_data.fild1": "123”,
> 
>"body_data.fild2": "234"
> 
>  },
> 
>  {
> 
>"uuid": "",
> 
>"body_data": null,
> 
>"body_data.fild1": "abc",
> 
>"body_data.fild2": "cdf"
> 
>  }
> 
>]
> 
> 
> 
> 
>当格式错误时
> 
> 
> 
> 
>输入:
> 
>{
> 
>"uuid": "”,
> 
>"body_data": "abc"
> 
>}
> 
>输出:
> 
>{
> 
>"uuid": "",
> 
>"body_data": "abc",
> 
>"body_data.fild1": null,
> 
>"body_data.fild2": null
> 
>}


Re:Re: flink sql如何实现json字符数据解析?

2023-11-29 文章 casel.chen



filed字段数量是固定的,但body_data数组包含的元素个数不固定,所以

Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1 as 
body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, 
null,body_data[2]. field, body_data[2]. field2)] as result




这种写死body_data[X]的sql语句应该不work








在 2023-11-23 15:10:00,"jinzhuguang"  写道:
>Flink 
>SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。
>比如:
>
>SourceT: (
>   uuid String,
>   body_data ARRAY>
>)
>
>SinkT (
>   result ARRAY String, body_data.fild2  String>>
>)
>
>Insert into SinkT (result)  select Array[ROW(uuid, null,body_data[1]. field1 
>as body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, 
>null,body_data[2]. field, body_data[2]. field2)] as result
>
>希望对你有帮助
>
>> 2023年11月22日 20:54,casel.chen  写道:
>> 
>> 输入:
>> 
>> {
>> 
>>  "uuid":"",
>> 
>>  "body_data": 
>> "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
>> 
>> }
>> 
>> 
>> 
>> 
>> 输出:
>> 
>> [
>> 
>>  {
>> 
>> "uuid": "",
>> 
>> "body_data: null,
>> 
>> "body_data.fild1": "123”,
>> 
>> "body_data.fild2": "234"
>> 
>>  },
>> 
>>  {
>> 
>> "uuid": "",
>> 
>> "body_data": null,
>> 
>> "body_data.fild1": "abc",
>> 
>> "body_data.fild2": "cdf"
>> 
>>  }
>> 
>> ]
>> 
>> 
>> 
>> 
>> 当格式错误时
>> 
>> 
>> 
>> 
>> 输入:
>> 
>> {
>> 
>> "uuid": "”,
>> 
>> "body_data": "abc"
>> 
>> }
>> 
>> 输出:
>> 
>> {
>> 
>> "uuid": "",
>> 
>> "body_data": "abc",
>> 
>> "body_data.fild1": null,
>> 
>> "body_data.fild2": null
>> 
>> }


Re: 关于Flink SQL语句内的函数名和内建函数名称对应不上的问题

2023-11-24 文章 jinzhuguang
感谢大佬,我找到了。
所以说SQL类的内建函数实际上使用的是calcite的能力,而flink自己的内建函数是在table api中使用

> 2023年11月24日 17:07,Xuyang  写道:
> 
> Hi, 
> 关于你举的例子,如果编译了源码的话,可以在FlinkSqlParserImpl这个被动态生成的词法解析器类中找到PostfixRowOperator方法,大致是通过识别到IS
>  NOT NULL这三个关键字,转化为Calcite的这个内置函数SqlStdOperatorTable.IS_NOT_NULL
> 
> 
> 
> 
> --
> 
>Best!
>Xuyang
> 
> 
> 
> 
> 
> 在 2023-11-24 15:15:04,"jinzhuguang"  写道:
>> flink 1.18.0
>> 
>> 
>> 例如我写下一条SQL:
>> select * from KafkaTable where id is not null;
>> 
>> IS NOT NULL应该属于系统内建函数,于是我找到相关代码:
>> 
>> public static final BuiltInFunctionDefinition IS_NOT_NULL =
>>   BuiltInFunctionDefinition.newBuilder()
>>   .name("isNotNull")
>>   .kind(SCALAR)
>>   
>> .inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
>>   
>> .outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
>>   .build();
>> 
>> 发现他的name是“ isNotNull”,和“is not null”对应不上。并且经过实际测验,确实证实了我的猜想:
>> 
>> DEBUG org.apache.flink.table.module.ModuleManager  [] - 
>> Cannot find FunctionDefinition 'is not null' from any loaded modules.
>> 
>> 
>> 所以我很疑惑,SQL到底是在哪里找到了”is not null”这个函数的呢?
>> 
>> 以下是调用栈:
>> @org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads()
>>   at 
>> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69)
>>   at 
>> org.apache.calcite.sql.SqlUtil.lookupSubjectRoutinesByName(SqlUtil.java:609)
>>   at 
>> org.apache.calcite.sql.SqlUtil.lookupSubjectRoutines(SqlUtil.java:535)
>>   at org.apache.calcite.sql.SqlUtil.lookupRoutine(SqlUtil.java:486)
>>   at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:595)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6302)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6287)
>>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1869)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1860)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4341)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:4333)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3606)
>>   at 
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:64)
>>   at 
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:89)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1050)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1025)
>>   at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:248)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1000)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)
>>   at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
>>   at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
>>   at 
>> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
>>   at 
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>>   at 
>> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:187)
>>   at 
>> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
>>   at 
>> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
>>   at 
>> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
>>   at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>   at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>   at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>   at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>   at java.lang.Thread.run(Thread.java:750)



Re:关于Flink SQL语句内的函数名和内建函数名称对应不上的问题

2023-11-24 文章 Xuyang
Hi, 
关于你举的例子,如果编译了源码的话,可以在FlinkSqlParserImpl这个被动态生成的词法解析器类中找到PostfixRowOperator方法,大致是通过识别到IS
 NOT NULL这三个关键字,转化为Calcite的这个内置函数SqlStdOperatorTable.IS_NOT_NULL




--

Best!
Xuyang





在 2023-11-24 15:15:04,"jinzhuguang"  写道:
>flink 1.18.0
>
>
>例如我写下一条SQL:
> select * from KafkaTable where id is not null;
>
>IS NOT NULL应该属于系统内建函数,于是我找到相关代码:
>
>public static final BuiltInFunctionDefinition IS_NOT_NULL =
>BuiltInFunctionDefinition.newBuilder()
>.name("isNotNull")
>.kind(SCALAR)
>
> .inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
>
> .outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
>.build();
>
>发现他的name是“ isNotNull”,和“is not null”对应不上。并且经过实际测验,确实证实了我的猜想:
>
>DEBUG org.apache.flink.table.module.ModuleManager  [] - Cannot 
>find FunctionDefinition 'is not null' from any loaded modules.
>
>
>所以我很疑惑,SQL到底是在哪里找到了”is not null”这个函数的呢?
>
>以下是调用栈:
>@org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads()
>at 
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69)
>at 
> org.apache.calcite.sql.SqlUtil.lookupSubjectRoutinesByName(SqlUtil.java:609)
>at 
> org.apache.calcite.sql.SqlUtil.lookupSubjectRoutines(SqlUtil.java:535)
>at org.apache.calcite.sql.SqlUtil.lookupRoutine(SqlUtil.java:486)
>at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:595)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6302)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6287)
>at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1869)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1860)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4341)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:4333)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3606)
>at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:64)
>at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:89)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1050)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1025)
>at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:248)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1000)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)
>at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
>at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
>at 
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
>at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:187)
>at 
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
>at 
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
>at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
>at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>at java.lang.Thread.run(Thread.java:750)


关于Flink SQL语句内的函数名和内建函数名称对应不上的问题

2023-11-23 文章 jinzhuguang
flink 1.18.0


例如我写下一条SQL:
 select * from KafkaTable where id is not null;

IS NOT NULL应该属于系统内建函数,于是我找到相关代码:

public static final BuiltInFunctionDefinition IS_NOT_NULL =
BuiltInFunctionDefinition.newBuilder()
.name("isNotNull")
.kind(SCALAR)

.inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();

发现他的name是“ isNotNull”,和“is not null”对应不上。并且经过实际测验,确实证实了我的猜想:

DEBUG org.apache.flink.table.module.ModuleManager  [] - Cannot 
find FunctionDefinition 'is not null' from any loaded modules.


所以我很疑惑,SQL到底是在哪里找到了”is not null”这个函数的呢?

以下是调用栈:
@org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads()
at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69)
at 
org.apache.calcite.sql.SqlUtil.lookupSubjectRoutinesByName(SqlUtil.java:609)
at 
org.apache.calcite.sql.SqlUtil.lookupSubjectRoutines(SqlUtil.java:535)
at org.apache.calcite.sql.SqlUtil.lookupRoutine(SqlUtil.java:486)
at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:595)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6302)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6287)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1869)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1860)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4341)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:4333)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3606)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:64)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:89)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1050)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1025)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:248)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1000)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:187)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)


Re: flink sql如何实现json字符数据解析?

2023-11-22 文章 jinzhuguang
Flink SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。
比如:

SourceT: (
uuid String,
   body_data ARRAY>
)

SinkT (
result ARRAY>
)

Insert into SinkT (result)  select Array[ROW(uuid, null,body_data[1]. field1 as 
body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, 
null,body_data[2]. field, body_data[2]. field2)] as result

希望对你有帮助

> 2023年11月22日 20:54,casel.chen  写道:
> 
> 输入:
> 
> {
> 
>  "uuid":"",
> 
>  "body_data": 
> "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
> 
> }
> 
> 
> 
> 
> 输出:
> 
> [
> 
>  {
> 
> "uuid": "",
> 
> "body_data: null,
> 
> "body_data.fild1": "123”,
> 
> "body_data.fild2": "234"
> 
>  },
> 
>  {
> 
> "uuid": "",
> 
> "body_data": null,
> 
> "body_data.fild1": "abc",
> 
> "body_data.fild2": "cdf"
> 
>  }
> 
> ]
> 
> 
> 
> 
> 当格式错误时
> 
> 
> 
> 
> 输入:
> 
> {
> 
> "uuid": "”,
> 
> "body_data": "abc"
> 
> }
> 
> 输出:
> 
> {
> 
> "uuid": "",
> 
> "body_data": "abc",
> 
> "body_data.fild1": null,
> 
> "body_data.fild2": null
> 
> }



flink sql如何实现json字符数据解析?

2023-11-22 文章 casel.chen
输入:

{

  "uuid":"",

  "body_data": 
"[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"

}




输出:

[

  {

"uuid": "",

"body_data: null,

"body_data.fild1": "123”,

"body_data.fild2": "234"

  },

  {

"uuid": "",

"body_data": null,

"body_data.fild1": "abc",

"body_data.fild2": "cdf"

  }

]




当格式错误时




输入:

{

"uuid": "”,

"body_data": "abc"

}

输出:

{

"uuid": "",

"body_data": "abc",

"body_data.fild1": null,

"body_data.fild2": null

}

flink sql支持批量lookup join

2023-11-21 文章 casel.chen
一行数据带了三个待lookup查询的key,分别是key1,key2和key3


id key1 key2 key3
想实现批量lookup查询返回一行数据 id value1 value2 value3


查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示
id key1 key2 key3
先将多列转成多行
id key1
id key2
id key3

分别进行lookup join后得到
id value1
id value2
id value3
最后多行转多列返回一行数据

id value1 value2 value3


上述方案目前我能想到的是通过udtf + udaf来实现,但缺点是不具备通用性。Flink社区打算原生支持么?

Re: flink sql作业如何支持配置流?

2023-11-20 文章 Yu Chen
Hi casel,

我们在生产中有类似的做法,可以考虑实现一个udtf,监听apollo的配置,根据配置选择是否filter数据。

Best,
Yu Chen


> 2023年11月20日 21:05,Xuyang  写道:
> 
> Hi, 
>是否可以将这个”配置维表“换成流表,利用flink cdc,改动这个配置表的时候,监听字段cdc变化,同时下游上流join呢?
> 
> 
> 
> 
> --
> 
>Best!
>Xuyang
> 
> 
> 
> 
> 
> 在 2023-11-20 19:24:47,"casel.chen"  写道:
>> 我有一个flink 
>> sql作业过滤条件customer_id是需要根据用户配置来定的,类似于Apollo配置中心,是否可以通过定义一张配置维表来实现呢?设置TTL定期去获取最新配置。
>> 
>> 
>> create table customer_conf_tbl (
>> customer_id STRING
>> ) with (
>> 'connector' = 'apollo',
>> '其他属性' 
>> );
>> select * from biz_table where customer_id in (select 
>> string_split(customer_id, ',') from customer_conf_tbl)
>> 
>> 
>> 如果要做成配置实时更新作用于sql作业的话又该如何实现呢?



flink sql作业如何支持配置流?

2023-11-20 文章 casel.chen
我有一个flink 
sql作业过滤条件customer_id是需要根据用户配置来定的,类似于Apollo配置中心,是否可以通过定义一张配置维表来实现呢?设置TTL定期去获取最新配置。


create table customer_conf_tbl (
  customer_id STRING
) with (
  'connector' = 'apollo',
  '其他属性' 
);
select * from biz_table where customer_id in (select string_split(customer_id, 
',') from customer_conf_tbl)


如果要做成配置实时更新作用于sql作业的话又该如何实现呢?

Flink sql 1.17.1 字段类型 DECIMAL(10, 0) 无法执行sql

2023-11-14 文章 刘聪聪
Flink 1.17.1 遇到  DECIMAL(10, 0)类型字段,直接无法运行,我用强转都不行,还是报数组越界,去除 DECIMAL(10, 
0)类型字段,sql运行都正常。













Re:Re:Re:回复: flink sql不支持show create catalog 吗?

2023-10-30 文章 casel.chen
谢谢解答,我查了一下目前有两种CatalogStore实现,一个是基于内存的,另一个是基于文件系统的。
请问要如何配置基于文件系统的CatalogStore?这个文件可以在对象存储上吗?flink sql client要如何使用这个CatalogStore? 
谢谢!

















在 2023-10-30 10:28:34,"Xuyang"  写道:
>Hi, CatalogStore 的引入我理解是为了Catalog能被更好地管理、注册和元数据存储,具体motivation可以参考Flip295[1].
>我的理解是倒不是说“引入CatalogStore后才可以提供show create 
>catalog语法支持”,而是之前没有直接存储catalog配置的地方和能力,在CatalogStore之后,天然支持了对catalog配置的存储,因此这个feat就可以直接快速的支持了。
>
>
>
>
>[1] 
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2023-10-29 20:34:52,"casel.chen"  写道:
>>请问flink 1.18引入的CatalogStore是为了解决什么问题呢?为什么引入CatalogStore后才可以提供show create 
>>catalog语法支持?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2023-10-20 17:03:46,"李宇彬"  写道:
>>>Hi Feng
>>>
>>>
>>>我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。
>>>| |
>>> 回复的原邮件 
>>>| 发件人 | Feng Jin |
>>>| 发送日期 | 2023年10月20日 13:18 |
>>>| 收件人 |  |
>>>| 主题 | Re: flink sql不支持show create catalog 吗? |
>>>hi casel
>>>
>>>
>>>从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。
>>>
>>>
>>>Best,
>>>Feng
>>>
>>>On Fri, Oct 20, 2023 at 11:55 AM casel.chen  wrote:
>>>
>>>之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink
>>>sql不支持show create catalog 。
>>>而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?


Re: flink sql如何处理脏数据问题?

2023-10-29 文章 ying lin
还有一种做法就是使用datastream,datastream支持sideoutput,但 flink
sql不支持,不过有一种迂回的做法就是flinksql -> datastream -> flink
sql,可以查一下官网资料,flinksql和datastream可以互相转换。

Xuyang  于2023年10月30日周一 10:17写道:

> Flink SQL目前对于脏数据没有类似side output的机制来输出,这个需求用自定义connector应该可以实现。
>
>
>
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2023-10-29 10:23:38,"casel.chen"  写道:
> >场景:使用flink
> sql将数据写入下游OLAP系统,如doris,遇到一些异常情况,比如字段值超长或者分区字段值为当前doris表不存在的分区(需要先人为创建)等等,当前写入这些脏数据会使得作业写入报错,进而导致作业失败。我们是希望能够将这些“脏”数据单独发到一个kafka
> topic或者写入一个文件便于事后审查。这个目前有办法做到吗?
>


Re:Re:回复: flink sql不支持show create catalog 吗?

2023-10-29 文章 Xuyang
Hi, CatalogStore 的引入我理解是为了Catalog能被更好地管理、注册和元数据存储,具体motivation可以参考Flip295[1].
我的理解是倒不是说“引入CatalogStore后才可以提供show create 
catalog语法支持”,而是之前没有直接存储catalog配置的地方和能力,在CatalogStore之后,天然支持了对catalog配置的存储,因此这个feat就可以直接快速的支持了。




[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations




--

Best!
Xuyang





在 2023-10-29 20:34:52,"casel.chen"  写道:
>请问flink 1.18引入的CatalogStore是为了解决什么问题呢?为什么引入CatalogStore后才可以提供show create 
>catalog语法支持?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2023-10-20 17:03:46,"李宇彬"  写道:
>>Hi Feng
>>
>>
>>我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。
>>| |
>> 回复的原邮件 
>>| 发件人 | Feng Jin |
>>| 发送日期 | 2023年10月20日 13:18 |
>>| 收件人 |  |
>>| 主题 | Re: flink sql不支持show create catalog 吗? |
>>hi casel
>>
>>
>>从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。
>>
>>
>>Best,
>>Feng
>>
>>On Fri, Oct 20, 2023 at 11:55 AM casel.chen  wrote:
>>
>>之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink
>>sql不支持show create catalog 。
>>而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?


Re:回复: flink sql不支持show create catalog 吗?

2023-10-29 文章 casel.chen
请问flink 1.18引入的CatalogStore是为了解决什么问题呢?为什么引入CatalogStore后才可以提供show create 
catalog语法支持?

















在 2023-10-20 17:03:46,"李宇彬"  写道:
>Hi Feng
>
>
>我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。
>| |
> 回复的原邮件 
>| 发件人 | Feng Jin |
>| 发送日期 | 2023年10月20日 13:18 |
>| 收件人 |  |
>| 主题 | Re: flink sql不支持show create catalog 吗? |
>hi casel
>
>
>从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。
>
>
>Best,
>Feng
>
>On Fri, Oct 20, 2023 at 11:55 AM casel.chen  wrote:
>
>之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink
>sql不支持show create catalog 。
>而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?


flink sql如何处理脏数据问题?

2023-10-28 文章 casel.chen
场景:使用flink 
sql将数据写入下游OLAP系统,如doris,遇到一些异常情况,比如字段值超长或者分区字段值为当前doris表不存在的分区(需要先人为创建)等等,当前写入这些脏数据会使得作业写入报错,进而导致作业失败。我们是希望能够将这些“脏”数据单独发到一个kafka
 topic或者写入一个文件便于事后审查。这个目前有办法做到吗?

回复: flink sql不支持show create catalog 吗?

2023-10-20 文章 李宇彬
Hi Feng


我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。
| |
 回复的原邮件 
| 发件人 | Feng Jin |
| 发送日期 | 2023年10月20日 13:18 |
| 收件人 |  |
| 主题 | Re: flink sql不支持show create catalog 吗? |
hi casel


从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。


Best,
Feng

On Fri, Oct 20, 2023 at 11:55 AM casel.chen  wrote:

之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink
sql不支持show create catalog 。
而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?


Re: flink sql不支持show create catalog 吗?

2023-10-19 文章 Feng Jin
hi casel


从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。


Best,
Feng

On Fri, Oct 20, 2023 at 11:55 AM casel.chen  wrote:

> 之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink
> sql不支持show create catalog 。
> 而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?


flink sql不支持show create catalog 吗?

2023-10-19 文章 casel.chen
之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink 
sql不支持show create catalog 。
而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?

Re: Flink SQL的状态清理

2023-10-17 文章 Jane Chan
Hi, 你好

如果使用的是 standalone session cluster, 想要在 JM/TM 日志中看到参数打印出来, 需要在集群启动前在
flink-conf.yaml 配置 table.exec.state.ttl: '${TTL}', 再启动集群;
集群启动后再修改的话, 日志不会打印出来, 可以通过 SET; 命令查看当前 SQL CLI 中配置的参数.
另外, 需要先执行 SET 'table.exec.state.ttl' = '${TTL}' , 然后提交作业, 可以确认下操作顺序是否有误.

祝好!
Jane

On Mon, Oct 9, 2023 at 6:01 PM 小昌同学  wrote:

> 你好,老师,我也是这样设置的,我这边是flink sql client,但是我去flink web ui界面并没有看到这个配置生效。
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>  回复的原邮件 
> | 发件人 | Jane Chan |
> | 发送日期 | 2023年9月25日 11:24 |
> | 收件人 |  |
> | 主题 | Re: Flink SQL的状态清理 |
> Hi,
>
> 可以通过设置 table.exec.state.ttl 来控制状态算子的 state TTL. 更多信息请参阅 [1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/overview/#%e7%8a%b6%e6%80%81%e7%ae%a1%e7%90%86
>
> Best,
> Jane
>
> On Thu, Sep 21, 2023 at 5:17 PM faronzz  wrote:
>
> 试试这个 t_env.get_config().set("table.exec.state.ttl", "86400 s")
>
>
>
>
> | |
> faronzz
> |
> |
> faro...@163.com
> |
>
>
>  回复的原邮件 
> | 发件人 | 小昌同学 |
> | 发送日期 | 2023年09月21日 17:06 |
> | 收件人 | user-zh |
> | 主题 | Flink SQL的状态清理 |
>
>
> 各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>


Re: 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。

2023-10-12 文章 jinzhuguang
感谢大佬!!!

> 2023年10月13日 10:44,tanjialiang  写道:
> 
> Hi, 
> 这个问题已经在1.17.0修复,详细可以看https://issues.apache.org/jira/browse/FLINK-30922
> 
> 
> best wishes,
> tanjialiang.
> 
> 
>  回复的原邮件 
> | 发件人 | jinzhuguang |
> | 发送日期 | 2023年10月13日 10:39 |
> | 收件人 | user-zh |
> | 主题 | 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。 |
> 首先,我的Flink版本为1.16.0
> 为了方便理解,我以Kafka作为案例来描述:
> 我有以下两个表:
> CREATE TABLE orders(
> user_id BIGINT,
> name STRING,
> timestamp TIMESTAMP(3) METADATA VIRTUAL
> )WITH(
> 'connector'='kafka',
> 'topic'='orders',
> 'properties.group.id' = 'test_join_tempral',
> 'scan.startup.mode'='earliest-offset',
> 'properties.bootstrap.servers'='localhost:9092',
> 'format'='json',
> 'json.ignore-parse-errors' = 'true'
> );
> CREATE TABLE kafka_sink(
> user_id BIGINT,
> name STRING,
> timestamp TIMESTAMP(3) METADATA FROM 'timestamp'
> )WITH(
> 'connector'='kafka',
> 'topic'='kafka_sink',
> 'properties.group.id' = 'test_join_tempral',
> 'scan.startup.mode'='earliest-offset',
> 'properties.bootstrap.servers'='localhost:9092',
> 'format'='json',
> 'json.ignore-parse-errors' = 'true'
> );
> 
> 正常情况:
> Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders;
> [INFO] Submitting SQL update statement to the cluster...
> [INFO] SQL update statement has been successfully submitted to the cluster:
> Job ID: e419ae9d2cad4c3c2a2c1150c1a86653
> 
> 
> 异常情况:
> Flink SQL> insert into kafka_sink(user_id,name,`timestamp`) select 
> user_id,name,`timestamp` from orders;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.sql.validate.SqlValidatorException: Unknown target column 
> 'timestamp'
> 很奇怪,为什么指定列名就不行了呢?而且还是识别不到”ts”列,kafka_sink schema如下:
> Flink SQL> describe kafka_sink;
> +---+--+--+-+---+---+
> |  name | type | null | key |extras | 
> watermark |
> +---+--+--+-+---+---+
> |   user_id |   BIGINT | TRUE | |   | 
>   |
> |  name |   STRING | TRUE | |   | 
>   |
> | timestamp | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' | 
>   |
> +---+--+--+-+---+---+
> 
> 
> 
> 恳请解答!



回复:关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。

2023-10-12 文章 tanjialiang
Hi, 
这个问题已经在1.17.0修复,详细可以看https://issues.apache.org/jira/browse/FLINK-30922


best wishes,
tanjialiang.


 回复的原邮件 
| 发件人 | jinzhuguang |
| 发送日期 | 2023年10月13日 10:39 |
| 收件人 | user-zh |
| 主题 | 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。 |
首先,我的Flink版本为1.16.0
为了方便理解,我以Kafka作为案例来描述:
我有以下两个表:
CREATE TABLE orders(
user_id BIGINT,
name STRING,
timestamp TIMESTAMP(3) METADATA VIRTUAL
)WITH(
'connector'='kafka',
'topic'='orders',
'properties.group.id' = 'test_join_tempral',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='localhost:9092',
'format'='json',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE kafka_sink(
user_id BIGINT,
name STRING,
timestamp TIMESTAMP(3) METADATA FROM 'timestamp'
)WITH(
'connector'='kafka',
'topic'='kafka_sink',
'properties.group.id' = 'test_join_tempral',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='localhost:9092',
'format'='json',
'json.ignore-parse-errors' = 'true'
);

正常情况:
Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: e419ae9d2cad4c3c2a2c1150c1a86653


异常情况:
Flink SQL> insert into kafka_sink(user_id,name,`timestamp`) select 
user_id,name,`timestamp` from orders;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Unknown target column 
'timestamp'
很奇怪,为什么指定列名就不行了呢?而且还是识别不到”ts”列,kafka_sink schema如下:
Flink SQL> describe kafka_sink;
+---+--+--+-+---+---+
|  name | type | null | key |extras | watermark 
|
+---+--+--+-+---+---+
|   user_id |   BIGINT | TRUE | |   |   
|
|  name |   STRING | TRUE | |   |   
|
| timestamp | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' |   
|
+---+--+--+-+---+---+



恳请解答!

关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。

2023-10-12 文章 jinzhuguang
首先,我的Flink版本为1.16.0
为了方便理解,我以Kafka作为案例来描述:
我有以下两个表:
CREATE TABLE orders(
user_id BIGINT,
name STRING,
timestamp TIMESTAMP(3) METADATA VIRTUAL
)WITH(
'connector'='kafka',
'topic'='orders',
'properties.group.id' = 'test_join_tempral',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='localhost:9092',
'format'='json',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE kafka_sink(
user_id BIGINT,
name STRING,
timestamp TIMESTAMP(3) METADATA FROM 'timestamp'
)WITH(
'connector'='kafka',
'topic'='kafka_sink',
'properties.group.id' = 'test_join_tempral',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='localhost:9092',
'format'='json',
'json.ignore-parse-errors' = 'true'
);

正常情况: 
Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: e419ae9d2cad4c3c2a2c1150c1a86653


异常情况:
Flink SQL> insert into kafka_sink(user_id,name,`timestamp`) select 
user_id,name,`timestamp` from orders;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Unknown target column 
'timestamp'
很奇怪,为什么指定列名就不行了呢?而且还是识别不到”ts”列,kafka_sink schema如下:
Flink SQL> describe kafka_sink;
+---+--+--+-+---+---+
|  name | type | null | key |extras | watermark 
|
+---+--+--+-+---+---+
|   user_id |   BIGINT | TRUE | |   |   
|
|  name |   STRING | TRUE | |   |   
|
| timestamp | TIMESTAMP(3) | TRUE | | METADATA FROM 'timestamp' |   
|
+---+--+--+-+---+---+



恳请解答!

回复: Flink SQL的状态清理

2023-10-09 文章 小昌同学
你好,老师,我也是这样设置的,我这边是flink sql client,但是我去flink web ui界面并没有看到这个配置生效。


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Jane Chan |
| 发送日期 | 2023年9月25日 11:24 |
| 收件人 |  |
| 主题 | Re: Flink SQL的状态清理 |
Hi,

可以通过设置 table.exec.state.ttl 来控制状态算子的 state TTL. 更多信息请参阅 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/overview/#%e7%8a%b6%e6%80%81%e7%ae%a1%e7%90%86

Best,
Jane

On Thu, Sep 21, 2023 at 5:17 PM faronzz  wrote:

试试这个 t_env.get_config().set("table.exec.state.ttl", "86400 s")




| |
faronzz
|
|
faro...@163.com
|


 回复的原邮件 
| 发件人 | 小昌同学 |
| 发送日期 | 2023年09月21日 17:06 |
| 收件人 | user-zh |
| 主题 | Flink SQL的状态清理 |


各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛
| |
小昌同学
|
|
ccc0606fight...@163.com
|


Re: Flink SQL的状态清理

2023-09-24 文章 Jane Chan
Hi,

可以通过设置 table.exec.state.ttl 来控制状态算子的 state TTL. 更多信息请参阅 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/overview/#%e7%8a%b6%e6%80%81%e7%ae%a1%e7%90%86

Best,
Jane

On Thu, Sep 21, 2023 at 5:17 PM faronzz  wrote:

> 试试这个 t_env.get_config().set("table.exec.state.ttl", "86400 s")
>
>
>
>
> | |
> faronzz
> |
> |
> faro...@163.com
> |
>
>
>  回复的原邮件 
> | 发件人 | 小昌同学 |
> | 发送日期 | 2023年09月21日 17:06 |
> | 收件人 | user-zh |
> | 主题 | Flink SQL的状态清理 |
>
>
> 各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


回复:Flink SQL的状态清理

2023-09-21 文章 faronzz
试试这个 t_env.get_config().set("table.exec.state.ttl", "86400 s")




| |
faronzz
|
|
faro...@163.com
|


 回复的原邮件 
| 发件人 | 小昌同学 |
| 发送日期 | 2023年09月21日 17:06 |
| 收件人 | user-zh |
| 主题 | Flink SQL的状态清理 |


各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛
| |
小昌同学
|
|
ccc0606fight...@163.com
|

Flink SQL的状态清理

2023-09-21 文章 小昌同学


各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛
| |
小昌同学
|
|
ccc0606fight...@163.com
|

Re: flink sql语句转成底层处理函数

2023-08-28 文章 Feng Jin
Loglevel 设置为 debug 之后,可以看到具体的 codegen 的代码。

On Mon, Aug 28, 2023 at 1:25 PM 海风 <18751805...@163.com> wrote:

> 嗯,执行计划确实可以看到一些信息,只是还想知道是否还有比较好的方式能看具体有哪些底层函数以及状态,从而更方便去分析性能相关问题的
>
>
>
>  回复的原邮件 
> | 发件人 | Shammon FY |
> | 日期 | 2023年08月28日 12:05 |
> | 收件人 | user-zh@flink.apache.org |
> | 抄送至 | |
> | 主题 | Re: flink sql语句转成底层处理函数 |
> 如果想看一个sql被转换后包含哪些具体执行步骤,可以通过explain语法[1]查看执行计划
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/explain/
>
> On Sun, Aug 27, 2023 at 5:23 PM 海风 <18751805...@163.com> wrote:
>
> > 请教下,是否可以去查询一个flink
> > sql提交运行后,flink给它转成的底层处理函数到底是什么样的,假如涉及状态计算,flink给这个sql定义的状态变量是哪些呢?
> >
> >
> >
>


回复:flink sql语句转成底层处理函数

2023-08-27 文章 海风
嗯,执行计划确实可以看到一些信息,只是还想知道是否还有比较好的方式能看具体有哪些底层函数以及状态,从而更方便去分析性能相关问题的



 回复的原邮件 
| 发件人 | Shammon FY |
| 日期 | 2023年08月28日 12:05 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | Re: flink sql语句转成底层处理函数 |
如果想看一个sql被转换后包含哪些具体执行步骤,可以通过explain语法[1]查看执行计划

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/explain/

On Sun, Aug 27, 2023 at 5:23 PM 海风 <18751805...@163.com> wrote:

> 请教下,是否可以去查询一个flink
> sql提交运行后,flink给它转成的底层处理函数到底是什么样的,假如涉及状态计算,flink给这个sql定义的状态变量是哪些呢?
>
>
>


Re: flink sql语句转成底层处理函数

2023-08-27 文章 Shammon FY
如果想看一个sql被转换后包含哪些具体执行步骤,可以通过explain语法[1]查看执行计划

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/explain/

On Sun, Aug 27, 2023 at 5:23 PM 海风 <18751805...@163.com> wrote:

> 请教下,是否可以去查询一个flink
> sql提交运行后,flink给它转成的底层处理函数到底是什么样的,假如涉及状态计算,flink给这个sql定义的状态变量是哪些呢?
>
>
>


flink sql语句转成底层处理函数

2023-08-27 文章 海风
请教下,是否可以去查询一个flink 
sql提交运行后,flink给它转成的底层处理函数到底是什么样的,假如涉及状态计算,flink给这个sql定义的状态变量是哪些呢?




Re: flink1.17.1版本 flink sql多表关联优化

2023-08-24 文章 xiaohui zhang
这种join写法会随时更新里面每一个字段,最终产出结果的业务含义是什么呢?
如果是取每个vehicle_code对应的最新统计指标值,是否可以用支持partial update的存储,用多个单独的sql直接写入目前就可以了

周先明  于2023年8月4日周五 11:01写道:

> Regular Join 默认把数据都存储在State中,通常会结合TTL来进行优化
>
> guanyq  于2023年8月3日周四 15:59写道:
>
> > 请问下多个表关联,这种flink sql如何优化呢,直接关联优点跑不动RuntimeExecutionMode.STREAMING 模式
> >
> > select
> > date_format(a.create_time, '-MM-dd HH:mm:ss') as create_time,
> > b.vehicle_code,
> > a.item_name,
> > a.item_value,
> > c.item_value as vehicle_score,
> > d.current_fault,
> > e.history_fault,
> > f.late_mileage,
> > g.fault_level_event_count,
> > h.current_fault_subsystem,
> > i.history_fault_subsystem
> > from fault_record_subsystem a
> > join mtr_vehicle_use b on a.vehicle_id = b.vehicle_id
> > join fault_record_vehicle c on a.vehicle_id = c.vehicle_id
> > join fault_record_current_count d on a.vehicle_id = d.vehicle_id
> > join fault_record_history_count e on a.vehicle_id = e.vehicle_id
> > join vehicle_usage_score f on a.vehicle_id = f.vehicle_id
> > join fault_record_level_event_count g on a.vehicle_id = g.vehicle_id
> > join fault_record_current_count_subsystem h on a.vehicle_id =
> h.vehicle_id
> > and a.item_name = h.item_name
> > join fault_record_history_count_subsystem i on a.vehicle_id =
> i.vehicle_id
> > and a.item_name = i.item_name
>


Re: flink sql作业状态跨存储系统迁移问题

2023-08-18 文章 Tianwang Li
可以 savepoint 到 HDFS,然后配置 checkpoint 的地址为 对象存储。

我们就是 flink 支持对象存储和 HDFS。

Hangxiang Yu  于2023年8月2日周三 14:03写道:

> Hi, 我理解可以有两种方式:
> 1. 设定从某个存储集群上恢复并向另一个存储集群上快照,即设置[1]为 HDFS地址,[2] 为后面的对象存储地址
> 2. 还是在HDFS集群上启停作业,设置 savepoint 目录[3]到对象存储
>
> 关于 state processor api,目前 sql 作业确实操作起来比较困难,只能从日志里获取 uid 等信息,以及理解 sql
> 实际产生的状态才能使用;
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#execution-savepoint-path
> [2]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#state-checkpoints-dir
> [3]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#state-savepoints-dir
>
> On Sat, Jul 29, 2023 at 11:09 AM casel.chen  wrote:
>
> > 我们要将当前在Hadoop Yarn上运行的flink
> > sql作业迁移到K8S上,状态存储介质要从HDFS更换到对象存储,以便作业能够从之前保存点恢复,升级对用户无感。
> > 又因为flink作业状态文件内容中包含有绝对路径,所以不能通过物理直接复制文件的办法实现。
> >
> >
> > 查了一下官网flink state processor api目前读取状态需要传参uid和flink状态类型,但问题是flink
> > sql作业的uid是自动生成的,状态类型我们也无法得知,请问有没有遍历目录下保存的所有状态并将其另存到另一个文件系统目录下的API ?
> 感觉state
> > processor api更适合stream api写的作业,sql作业几乎无法处理。是这样么?
>
>
>
> --
> Best,
> Hangxiang.
>


-- 
**
 tivanli
**


Re: flink1.17.1版本 flink sql多表关联优化

2023-08-03 文章 周先明
Regular Join 默认把数据都存储在State中,通常会结合TTL来进行优化

guanyq  于2023年8月3日周四 15:59写道:

> 请问下多个表关联,这种flink sql如何优化呢,直接关联优点跑不动RuntimeExecutionMode.STREAMING 模式
>
> select
> date_format(a.create_time, '-MM-dd HH:mm:ss') as create_time,
> b.vehicle_code,
> a.item_name,
> a.item_value,
> c.item_value as vehicle_score,
> d.current_fault,
> e.history_fault,
> f.late_mileage,
> g.fault_level_event_count,
> h.current_fault_subsystem,
> i.history_fault_subsystem
> from fault_record_subsystem a
> join mtr_vehicle_use b on a.vehicle_id = b.vehicle_id
> join fault_record_vehicle c on a.vehicle_id = c.vehicle_id
> join fault_record_current_count d on a.vehicle_id = d.vehicle_id
> join fault_record_history_count e on a.vehicle_id = e.vehicle_id
> join vehicle_usage_score f on a.vehicle_id = f.vehicle_id
> join fault_record_level_event_count g on a.vehicle_id = g.vehicle_id
> join fault_record_current_count_subsystem h on a.vehicle_id = h.vehicle_id
> and a.item_name = h.item_name
> join fault_record_history_count_subsystem i on a.vehicle_id = i.vehicle_id
> and a.item_name = i.item_name


flink1.17.1版本 flink sql多表关联优化

2023-08-03 文章 guanyq
请问下多个表关联,这种flink sql如何优化呢,直接关联优点跑不动RuntimeExecutionMode.STREAMING 模式

select
date_format(a.create_time, '-MM-dd HH:mm:ss') as create_time,
b.vehicle_code,
a.item_name,
a.item_value,
c.item_value as vehicle_score,
d.current_fault,
e.history_fault,
f.late_mileage,
g.fault_level_event_count,
h.current_fault_subsystem,
i.history_fault_subsystem
from fault_record_subsystem a
join mtr_vehicle_use b on a.vehicle_id = b.vehicle_id
join fault_record_vehicle c on a.vehicle_id = c.vehicle_id
join fault_record_current_count d on a.vehicle_id = d.vehicle_id
join fault_record_history_count e on a.vehicle_id = e.vehicle_id
join vehicle_usage_score f on a.vehicle_id = f.vehicle_id
join fault_record_level_event_count g on a.vehicle_id = g.vehicle_id
join fault_record_current_count_subsystem h on a.vehicle_id = h.vehicle_id and 
a.item_name = h.item_name
join fault_record_history_count_subsystem i on a.vehicle_id = i.vehicle_id and 
a.item_name = i.item_name

Re: flink sql作业状态跨存储系统迁移问题

2023-08-02 文章 Hangxiang Yu
Hi, 我理解可以有两种方式:
1. 设定从某个存储集群上恢复并向另一个存储集群上快照,即设置[1]为 HDFS地址,[2] 为后面的对象存储地址
2. 还是在HDFS集群上启停作业,设置 savepoint 目录[3]到对象存储

关于 state processor api,目前 sql 作业确实操作起来比较困难,只能从日志里获取 uid 等信息,以及理解 sql
实际产生的状态才能使用;

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#execution-savepoint-path
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#state-checkpoints-dir
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#state-savepoints-dir

On Sat, Jul 29, 2023 at 11:09 AM casel.chen  wrote:

> 我们要将当前在Hadoop Yarn上运行的flink
> sql作业迁移到K8S上,状态存储介质要从HDFS更换到对象存储,以便作业能够从之前保存点恢复,升级对用户无感。
> 又因为flink作业状态文件内容中包含有绝对路径,所以不能通过物理直接复制文件的办法实现。
>
>
> 查了一下官网flink state processor api目前读取状态需要传参uid和flink状态类型,但问题是flink
> sql作业的uid是自动生成的,状态类型我们也无法得知,请问有没有遍历目录下保存的所有状态并将其另存到另一个文件系统目录下的API ? 感觉state
> processor api更适合stream api写的作业,sql作业几乎无法处理。是这样么?



-- 
Best,
Hangxiang.


flink sql作业状态跨存储系统迁移问题

2023-07-28 文章 casel.chen
我们要将当前在Hadoop Yarn上运行的flink 
sql作业迁移到K8S上,状态存储介质要从HDFS更换到对象存储,以便作业能够从之前保存点恢复,升级对用户无感。
又因为flink作业状态文件内容中包含有绝对路径,所以不能通过物理直接复制文件的办法实现。


查了一下官网flink state processor api目前读取状态需要传参uid和flink状态类型,但问题是flink 
sql作业的uid是自动生成的,状态类型我们也无法得知,请问有没有遍历目录下保存的所有状态并将其另存到另一个文件系统目录下的API ? 感觉state 
processor api更适合stream api写的作业,sql作业几乎无法处理。是这样么?

flink sql 传参数问题

2023-07-12 文章 1
Hello:
  请教2个问题。
   1、flink 使用sql-client.sh -f xx.sql 怎么传递参数修改sql里面的文件。比如MySQL,Kafka的连接地址。
   2、flink sql消费Kafka 
设置group-offset,group.id之前没提交过,会直接报错。怎么设置成没提交过从earliest消费等等。
 感谢大家








Re: flink on native k8s里如何使用flink sql gateway

2023-07-05 文章 Shammon FY
Hi,

我们的做法是启动Flink集群后,在其他节点(pod或者独立启动)启动Sql-Gateway,通过Flink的地址远程连接Flink集群,这样Sql-Gateway的部署和Flink集群完全分开

Best,
Shammon FY


On Tue, Jul 4, 2023 at 10:52 AM chaojianok  wrote:

> 大家好,请教个问题。
>
> 用native kubernetes方式在k8s集群上部署好了flink,现在需要在这个flink集群里使用flink sql
> gateway,大家有什么好的方案吗?
> 目前的做法是,进入pod里启动sql gateway,然后在k8s创建flink-sql-gateway
> service,这样就可以通过这个service来访问sql
> gateway了,但是这个方法有个问题,部署过程中必需进入pod启服务,这是不利于自动化部署的,具体的操作命令如下,大家帮忙看看有没有好的解决方案来避免这个问题。
>
> 1、创建flink集群
> ./bin/kubernetes-session.sh \
> -Dkubernetes.cluster-id=flink-cluster \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.service-account=flink-service-account \
> -Dkubernetes.rest-service.exposed.type=NodePort
>
> 2、进入pod通过 ./bin/sql-gateway.sh start
> -Dsql-gateway.endpoint.rest.address=localhost 启动sql gateway服务,退出pod
>
> 3、创建flink-sql-gateway service
> kubectl expose deployment flink-cluster --type=NodePort --port=8083
> --name=flink-sql-gateway -n flink
>


Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-15 文章 daniel sun
退订

On Thu, Jun 15, 2023 at 7:23 PM im huzi  wrote:

> 退订
>
> On Tue, Jun 13, 2023 at 08:51 casel.chen  wrote:
>
> > 线上跑了200多个flink
> >
> sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。
> > flink
> >
> sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称,
> > 请问这个问题有什么好的办法解决吗?
>


Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-15 文章 im huzi
退订

On Tue, Jun 13, 2023 at 08:51 casel.chen  wrote:

> 线上跑了200多个flink
> sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。
> flink
> sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称,
> 请问这个问题有什么好的办法解决吗?


Re: Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-14 文章 Feng Jin
配置参数之后, task name 也会简化.


Best,
Feng

On Wed, Jun 14, 2023 at 11:23 AM casel.chen  wrote:

>
>
>
>
>
>
>
>
>
>
>
>
> 谢谢,除了operator name,我看了flink sql作业生成的task name也很长,目前有办法可以简化下吗?例如
>
>
> flink_taskmanager_job_task_operator_fetch_total{job_id="4c24ce399f369ba2b7ae5ce51ec034d3",task_id="5c4ca2fea30dcf09bf3ee40c495fe808",task_attempt_id="5110227bf582bd21ecf6102625fadc16",host="172_19_197_35",operator_id="5c4ca2fea30dcf09bf3ee40c495fe808",operator_name="Source:_TableSourceScan_table___hive__default__top_trans_orderfields__acc_sp",task_name="Source:_TableSourceScan_table___hive__default__top_trans_orderfields__acc_split_bunch__acct_code__acct_fee_date__acct_finish_time__acct_id__acct_message__acct_stat__acct_trans_date__acct_trans_id__acqr_inst_id__actual_pay_channel__actual_pay_channel_sub_mer_id__agent_id__area_info__atu_sub_mer_id__auth_flag__auth_no__bagent_id__bagent_name__bank_date__bank_id__bank_mer_id__bank_mer_name__bank_name__bank_resp_code__bank_resp_desc__bank_seq_id__bank_term_id__bank_type__batch_id__busscode__busstype__card_bank_id__card_channel_type__card_sign__cash_req_date__cash_resp_code__cash_resp_desc__cash_trans_id__cashier_amt__cashier_version__channel_code__channel_finish_time__channel_message__channel_stat__channel_type__check_cash_date__check_cash_flag__chk_time__close_trans_stat__cloud_pay__correct_stat__create_time__creator__credit_fee_amt__credit_type__db_unit__dc_response__dc_type__debit_fee_amt__debit_fee_formula__dev_type__devs_id__discount_amt__div_info__double_exempt__double_limit_amt__fee_acct_id__fee_allowance_flag__fee_amt__fee_flag__fee_formula__fee_huifu_id__fee_member_id__fee_real_acct_id__fee_real_cust_id__fee_rec_type__fee_source__fee_split_type__fq_fee_amt__fq_mer_discount_flag__fq_ref_fee_amt__gate_id__goods_desc__helipay_fee_account_amt__helipay_fee_rate__hf_seq_id__huifu_id__icc_data__id__is_acct_div__is_acct_div_param__is_delay_acct__is_deleted__is_route__iss_inst_id__labels__lc__market_flag__maze_bg_date__maze_bg_seq_id__maze_pnr_dev_id__maze_resp_code__maze_resp_desc__mcc__mer_info__mer_name__mer_oper_id__mer_ord_id__mer_priv__modifier__modify_time__mypaytsf_discount__network__oper_type__ord_amt__ord_id__org_acct_id__org_auth_code__org_auth_no__org_huifu_seq_id__org_ord_id__org_trans_date__out_ord_id__out_trans_id__pa_mer_id__pa_product_id__pa_trans_id__par__party_order_id__pay_amt__pay_card_id__pay_card_id_enc__pay_channel__pay_channel_id__pay_scene__pay_type__pnr_dev_id__pos_mer_id__pos_mer_name__pos_term_id__posp_seq_id__product_id__promotion_detail__real_acct_id__real_cust_id__real_gate_id__real_pay_type__ref_amt__ref_cnt__ref_fee_amt__ref_num__region_id__remark__req_date__req_seq_id__route_mer_id__route_region_id__route_terminal_id__send_time__settle_amt__settle_trans_stat__shop_name__sn_code__source_region_id__subsidy_amt__subsidy_ref_amt__subsidy_stat__sys_id__sys_trace_audit_num__term_batch_id__term_div_coupon_type__time_expire__trans_close_notify_url__trans_date__trans_finish_time__trans_notify_url__trans_stat__trans_type__un_scene_info__unconfirm_amt__unconfirm_fee_amt__version__Calc_select__huifu_id__trans_dateUTF_16LE_top:base:channel_merch_product_relationhuifu_idUTF_16LE__product_id__AS__f184___whereproduct_id_UTF_16LE_MCS_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___AND__trans_statUTF_16LE_S_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE_LookupJoin_table__hive_default_redis_dim_channel_merch_product_relation___joinType__InnerJoin___async__false___lookup__product_id__f184___where___channel_id_UTF_16LE__:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE_select__huifu_id__trans_date___f184__product_id__channel_id__Calc_select__trans_date__channel_id__UTF_16LE_top:base:org_infohuifu_id__AS__f189__LookupJoin_table__hive_default_redis_dim_org_info___joinType__LeftOuterJoin___async__false___lookup__org_cust_id__f189___select__trans_date__channel_id___f189__org_cust_id__huifu_fst_org__huifu_sec_org__huifu_thd_org__huifu_for_org__huifu_sales_sub__Calc_select__trans_date_AS_transDate__channel_id_AS_serviceId__CASE__huifu_fst_org_IS_NULL_OR__TRIM_FLAG_BOTHUTF_16LE_huifu_fst_org_UTF_16LE__:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___OR__TRIM_FLAG_BOTHUTF_16LE_huifu_fst_org_UTF_16LE_null_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE__UTF_16LE_defalut_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___TRIM_FLAG_BOTHUTF_16LE_huifu_fst_org___AS_huifuFstOrg__CASE__huifu_sec_org_IS_NULL_OR__TRIM_FLAG_BOTHUTF_16LE_huifu_sec_org_UTF_16LE__:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___OR__TRIM_FLAG_BOTHUTF_16LE_huifu_sec_org_UTF_16LE_null_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE__UTF_16LE_defalut_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___TRIM_FLAG_BOTHUTF_16LE_huifu_sec_

Re:Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-13 文章 casel.chen












谢谢,除了operator name,我看了flink sql作业生成的task name也很长,目前有办法可以简化下吗?例如


flink_taskmanager_job_task_operator_fetch_total{job_id="4c24ce399f369ba2b7ae5ce51ec034d3",task_id="5c4ca2fea30dcf09bf3ee40c495fe808",task_attempt_id="5110227bf582bd21ecf6102625fadc16",host="172_19_197_35",operator_id="5c4ca2fea30dcf09bf3ee40c495fe808",operator_name="Source:_TableSourceScan_table___hive__default__top_trans_orderfields__acc_sp",task_name="Source:_TableSourceScan_table___hive__default__top_trans_orderfields__acc_split_bunch__acct_code__acct_fee_date__acct_finish_time__acct_id__acct_message__acct_stat__acct_trans_date__acct_trans_id__acqr_inst_id__actual_pay_channel__actual_pay_channel_sub_mer_id__agent_id__area_info__atu_sub_mer_id__auth_flag__auth_no__bagent_id__bagent_name__bank_date__bank_id__bank_mer_id__bank_mer_name__bank_name__bank_resp_code__bank_resp_desc__bank_seq_id__bank_term_id__bank_type__batch_id__busscode__busstype__card_bank_id__card_channel_type__card_sign__cash_req_date__cash_resp_code__cash_resp_desc__cash_trans_id__cashier_amt__cashier_version__channel_code__channel_finish_time__channel_message__channel_stat__channel_type__check_cash_date__check_cash_flag__chk_time__close_trans_stat__cloud_pay__correct_stat__create_time__creator__credit_fee_amt__credit_type__db_unit__dc_response__dc_type__debit_fee_amt__debit_fee_formula__dev_type__devs_id__discount_amt__div_info__double_exempt__double_limit_amt__fee_acct_id__fee_allowance_flag__fee_amt__fee_flag__fee_formula__fee_huifu_id__fee_member_id__fee_real_acct_id__fee_real_cust_id__fee_rec_type__fee_source__fee_split_type__fq_fee_amt__fq_mer_discount_flag__fq_ref_fee_amt__gate_id__goods_desc__helipay_fee_account_amt__helipay_fee_rate__hf_seq_id__huifu_id__icc_data__id__is_acct_div__is_acct_div_param__is_delay_acct__is_deleted__is_route__iss_inst_id__labels__lc__market_flag__maze_bg_date__maze_bg_seq_id__maze_pnr_dev_id__maze_resp_code__maze_resp_desc__mcc__mer_info__mer_name__mer_oper_id__mer_ord_id__mer_priv__modifier__modify_time__mypaytsf_discount__network__oper_type__ord_amt__ord_id__org_acct_id__org_auth_code__org_auth_no__org_huifu_seq_id__org_ord_id__org_trans_date__out_ord_id__out_trans_id__pa_mer_id__pa_product_id__pa_trans_id__par__party_order_id__pay_amt__pay_card_id__pay_card_id_enc__pay_channel__pay_channel_id__pay_scene__pay_type__pnr_dev_id__pos_mer_id__pos_mer_name__pos_term_id__posp_seq_id__product_id__promotion_detail__real_acct_id__real_cust_id__real_gate_id__real_pay_type__ref_amt__ref_cnt__ref_fee_amt__ref_num__region_id__remark__req_date__req_seq_id__route_mer_id__route_region_id__route_terminal_id__send_time__settle_amt__settle_trans_stat__shop_name__sn_code__source_region_id__subsidy_amt__subsidy_ref_amt__subsidy_stat__sys_id__sys_trace_audit_num__term_batch_id__term_div_coupon_type__time_expire__trans_close_notify_url__trans_date__trans_finish_time__trans_notify_url__trans_stat__trans_type__un_scene_info__unconfirm_amt__unconfirm_fee_amt__version__Calc_select__huifu_id__trans_dateUTF_16LE_top:base:channel_merch_product_relationhuifu_idUTF_16LE__product_id__AS__f184___whereproduct_id_UTF_16LE_MCS_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___AND__trans_statUTF_16LE_S_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE_LookupJoin_table__hive_default_redis_dim_channel_merch_product_relation___joinType__InnerJoin___async__false___lookup__product_id__f184___where___channel_id_UTF_16LE__:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE_select__huifu_id__trans_date___f184__product_id__channel_id__Calc_select__trans_date__channel_id__UTF_16LE_top:base:org_infohuifu_id__AS__f189__LookupJoin_table__hive_default_redis_dim_org_info___joinType__LeftOuterJoin___async__false___lookup__org_cust_id__f189___select__trans_date__channel_id___f189__org_cust_id__huifu_fst_org__huifu_sec_org__huifu_thd_org__huifu_for_org__huifu_sales_sub__Calc_select__trans_date_AS_transDate__channel_id_AS_serviceId__CASE__huifu_fst_org_IS_NULL_OR__TRIM_FLAG_BOTHUTF_16LE_huifu_fst_org_UTF_16LE__:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___OR__TRIM_FLAG_BOTHUTF_16LE_huifu_fst_org_UTF_16LE_null_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE__UTF_16LE_defalut_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___TRIM_FLAG_BOTHUTF_16LE_huifu_fst_org___AS_huifuFstOrg__CASE__huifu_sec_org_IS_NULL_OR__TRIM_FLAG_BOTHUTF_16LE_huifu_sec_org_UTF_16LE__:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___OR__TRIM_FLAG_BOTHUTF_16LE_huifu_sec_org_UTF_16LE_null_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE__UTF_16LE_defalut_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___TRIM_FLAG_BOTHUTF_16LE_huifu_sec_org___AS_huifuSecOrg__CASE__huifu_thd_org_IS_NULL_OR__TRIM_FLAG_BOTHUTF_16LE_huifu_thd_org_UTF_16LE__:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___OR

Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-12 文章 Feng Jin
hi casel

1. 可以考虑使用 Flink1.15, 使用精简的 operator name

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/#table-exec-simplify-operator-name-enabled

2.  Flink 也提供了 restful 接口直接获取瞬时的 metric,如果不需要历史的 metric

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobmanager-metrics


Best,
Feng

On Tue, Jun 13, 2023 at 8:51 AM casel.chen  wrote:

> 线上跑了200多个flink
> sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。
> flink
> sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称,
> 请问这个问题有什么好的办法解决吗?


flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-12 文章 casel.chen
线上跑了200多个flink 
sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。
flink 
sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称,
请问这个问题有什么好的办法解决吗?

Flink SQL对同一kafka source进行多sink操作时会报javax.management.InstanceAlreadyExistsException异常

2023-06-02 文章 Jeff
sql示例:
create table kafka_source() with ('connector'='kafka');
insert into sink_table1 select * from kafka_source;
insert into sink_table2 select * from kafka_source;




报错内容如下:
javax.management.InstanceAlreadyExistsException: 
kafka.admin.client:type=app-info,id=jxqy_customer_service-enumerator-admin-client
\tat 
java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
\tat 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
\tat 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
\tat 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
\tat 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
\tat 
java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
\tat 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
\tat 
org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:500)
\tat 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:444)
\tat org.apache.kafka.clients.admin.Admin.create(Admin.java:59)
\tat org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
\tat 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410)
\tat 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151)
\tat 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:209)
\tat 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406)
\tat 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
\tat 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
\tat 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
\tat 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
\tat 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
\tat java.base/java.lang.Thread.run(Thread.java:829)

Re: 用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

2023-05-26 文章 Shammon FY
Hi

可以将天级时间和其他需要聚合的字段组成key,使用聚合算子,默认会每条数据完成计算后实时输出结果

Best,
Shammon FY

On Fri, May 26, 2023 at 3:44 PM casel.chen  wrote:

> 用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?


用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

2023-05-26 文章 casel.chen
用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

Re: 使用flink sql创建版本视图无法正常使用

2023-05-17 文章 Shammon FY
Hi,

你邮件里的图片无法显示,也没办法看到具体的错误信息

Best,
Shammon FY


On Thu, May 18, 2023 at 10:15 AM arkey w  wrote:

> flink版本:1.14.5
> 在项目使用版本表时,准备使用版本视图,但创建后无法正常使用。后根据官网提供的示例(  Versioned Tables | Apache Flink
> 
> )进行验证也同样无法使用,创建sql如下:
> 创建事实表:
> [image: image.png]
>
> 创建版本视图:
> [image: image.png]
> [image: image.png]
>
>
> Temporal Join的结果出现了报错:
> [image: image.png]
>
> 在desc视图的时候发现视图并没有主键以及事件时间字段,而join的时候也因此报了错。
> 是我操作哪里有问题吗,要如何才能正确使用版本视图?
>
>


使用flink sql创建版本视图无法正常使用

2023-05-17 文章 arkey w
flink版本:1.14.5
在项目使用版本表时,准备使用版本视图,但创建后无法正常使用。后根据官网提供的示例(  Versioned Tables | Apache Flink

)进行验证也同样无法使用,创建sql如下:
创建事实表:
[image: image.png]

创建版本视图:
[image: image.png]
[image: image.png]


Temporal Join的结果出现了报错:
[image: image.png]

在desc视图的时候发现视图并没有主键以及事件时间字段,而join的时候也因此报了错。
是我操作哪里有问题吗,要如何才能正确使用版本视图?


flink sql case when 中文数据写入doris出现乱码

2023-05-17 文章 casel.chen
使用flink sql写mysql表数据到doris表,发现case 
when语句判断交易类型使用了中文,写入后在doris查出是乱码,而mysql其他中文字段写入是正确的,想问一下这个sql中出现的乱码问题要解决?

回复:Flink SQL CEP如何处理双(多)流输入?

2023-05-12 文章 CloudFunny
双流join?
 回复的原邮件 
| 发件人 | casel.chen |
| 发送日期 | 2023年05月12日 11:52 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | Flink SQL CEP如何处理双(多)流输入? |
请问Flink SQL CEP只能处理单流输入吗?网上看到的例子都是在同一个输入流中进行CEP处理,有没有双(多)输入流下使用CEP处理的例子?谢谢!

Flink SQL CEP如何处理双(多)流输入?

2023-05-11 文章 casel.chen
请问Flink SQL CEP只能处理单流输入吗?网上看到的例子都是在同一个输入流中进行CEP处理,有没有双(多)输入流下使用CEP处理的例子?谢谢!

Re: 使用Flink SQL如何实现支付对帐超时告警?

2023-05-10 文章 Hongshun Wang
Hi  casel.chen,
我理解你的意思是:
希望在ThirdPartyPaymentStream一条数据达到的30分钟后,*再触发查询*
,如果此时该数据在PlatformPaymentStream中还未出现,说明超时未支付,则输入到下游。而不是等ThirdPartyPaymentStream数据达到时再判断是否超时,因为此时虽然超时达到,但是也算已支付,没必要再触发报警了。

如果是流计算,可以采用timer定时器延时触发。

对于sql, 我个人的一个比较绕的想法是(供参考,不一定对):是通过Pulsar
Sink(或RocketMQ等有延迟队列的消息中间件)将PlatformPaymentStream的数据写入延迟队列(30分钟)[1],
然后延迟消费为PlatformPaymentStream2。然后将PlatformPaymentStream2 *left join*
ThirdPartyPaymentStream, 如果join后的结果不包含ThirdPartyPaymentStream部分,说明没有及时付款。

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/pulsar/#%e6%b6%88%e6%81%af%e5%bb%b6%e6%97%b6%e5%8f%91%e9%80%81

Best
Hongshun

On Wed, May 10, 2023 at 8:45 AM Shammon FY  wrote:

> Hi
>
> 如果使用CEP,可以将两个流合并成一个流,然后通过subtype根据不同的事件类型来匹配,定义CEP的Pattern,例如以下这种
> DataStream s1 = ...;
> DataStream s2 = ...;
> DataStream s = s1.union(s1)...;
> Pattern = Pattern.begin("first")
> .subtype(E1.class)
> .where(...)
> .followedBy("second")
>     .subtype(E2.class)
> .where(...)
>
> 如果使用Flink SQL,可以直接使用双流Join+窗口实现
>
> Best,
> Shammon FY
>
>
>
>
> On Wed, May 10, 2023 at 2:24 AM casel.chen  wrote:
>
> > 需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink
> > SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。
> > 请问这个双流实时对帐场景使用Flink CEP SQL要如何实现?
> >
> >
> 网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。
>


Re: 使用Flink SQL如何实现支付对帐超时告警?

2023-05-09 文章 Shammon FY
Hi

如果使用CEP,可以将两个流合并成一个流,然后通过subtype根据不同的事件类型来匹配,定义CEP的Pattern,例如以下这种
DataStream s1 = ...;
DataStream s2 = ...;
DataStream s = s1.union(s1)...;
Pattern = Pattern.begin("first")
.subtype(E1.class)
.where(...)
.followedBy("second")
.subtype(E2.class)
    .where(...)

如果使用Flink SQL,可以直接使用双流Join+窗口实现

Best,
Shammon FY




On Wed, May 10, 2023 at 2:24 AM casel.chen  wrote:

> 需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink
> SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。
> 请问这个双流实时对帐场景使用Flink CEP SQL要如何实现?
>
> 网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。


使用Flink SQL如何实现支付对帐超时告警?

2023-05-09 文章 casel.chen
需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。
请问这个双流实时对帐场景使用Flink CEP SQL要如何实现?
网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。

flink sql canal json格式侧输出parse error记录问题

2023-05-06 文章 casel.chen
线上使用flink sql消费kafka topic canal json格式数据,发现有一些数据中有的时间字段值为-00-00 
00:00:00无法被解析,于是加了'canal-json.ignore-parse-errors = true' 
参数,作业是能够正常运行了,但同时我们也希望知道哪些数据解析失败以便发给上游业务系统去自查。想问一下除了ignore外,有办法将这些parse 
error数据输出到另外一个kafka topic吗?谢谢!

Re: flink sql的codegen导致metaspace OOM疑问

2023-03-29 文章 Shammon FY
Hi

自增id可以为同一个作业的多个codegen类生成唯一类名
一般metaspace可以通过fullgc释放,你可以查看你的集群metaspace大小,是否触发了了fullgc

Best,
Shammon FY

On Wednesday, March 29, 2023, tanjialiang  wrote:

> Hi all,
>我有一个通过flink kubernetes operator定时提交到同一个session作业(底层是将flink
> sql转JobGraph的逻辑下推到了JobManager执行),当他跑了一段时间后,JobManager报了metaspace OOM.
>经过排查后发现是flink sql codegen生成的代码类有一个自增ID,这些类在使用完后不会释放。
>
>
> 疑问:
> 1. flink sql codegen做这样的一个自增ID有什么特殊意义吗?
> 2. java中通过类加载器加载的类有什么办法可以释放?
>
>
>
>
>


flink sql的codegen导致metaspace OOM疑问

2023-03-29 文章 tanjialiang
Hi all,
   我有一个通过flink kubernetes operator定时提交到同一个session作业(底层是将flink 
sql转JobGraph的逻辑下推到了JobManager执行),当他跑了一段时间后,JobManager报了metaspace OOM.
   经过排查后发现是flink sql codegen生成的代码类有一个自增ID,这些类在使用完后不会释放。


疑问:
1. flink sql codegen做这样的一个自增ID有什么特殊意义吗?
2. java中通过类加载器加载的类有什么办法可以释放?






找到多个default类型的ExecutorFactory导致提交flink sql作业失败

2023-03-28 文章 casel.chen
我的实时作业项目想解析sql获取到TableIdentifier做sql血缘,使用的版本是flink 1.15.2,同时引入了 
flink-table-planner_2.12 和 flink-table-planner-loader 依赖,debug时发现

 TableEnvironmentImpl create(EnvironmentSettings settings) 方法会调用

 FactoryUtil.discoverFactory(classLoader, ExecutorFactory.class, 
ExecutorFactory.DEFAULT_IDENTIFIER)方法
去寻找带有default标识的ExecutorFactory,结果找到了两个,一个是DelegateExcutorFactory,另一个是DefaultExecutorFactory。
于是抛了异常 "Multiple factories for identifier 'default' that implement 
ExecutorFactory found in the classpath."

 进一步查看到这个DelegateExcutorFactory其实代理的是就是DefaultExecutorFactory




 请问:

 1. 这个DelegateExcutorFactory起什么作用?

 2. 这两个module依赖的有什么区别和联系?

 3. 项目中只能依赖这两个当中的其中一个jar吗?正确的应该依赖哪个module呢?

flink sql upsert mysql问题

2023-03-27 文章 小昌同学
你好,我这边使用flink  sql实现四条流的关联,后续实现case 
when的逻辑,并且将数据插入到mysql,但是从结果数据来看,数据存在部分丢失,代码我粘贴再后面,麻烦各位老师指导,下面是sql【create 
function get_json_value as 'com.nesc.flink.udf.GetJsonValue';
set 'table.exec.sink.not-null-enforcer'='drop';
测试环境
CREATE TABLE dm_cust_oact_prog_ri (
 cust_id STRING COMMENT '客户id'
,cust_nme STRING COMMENT '客户姓名'
,cust_mob_tel STRING COMMENT '客户手机号'
,cust_curr_step STRING COMMENT '客户当前步骤'
,cust_curr_step_num INT COMMENT '客户当前步骤数字'
,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间'
,user_id STRING COMMENT '开户时使用的user_id'
,tech_sys_time STRING COMMENT '技术字段,更新时间'
,primary key (user_id,cust_curr_step) not enforced
) WITH (
 'connector' = 'jdbc'
,'url' = 'jdbc:mysql://111/iap'
,'username' = 'db_iap'
,'password' = '加密内容2'
,'table-name' = 'dm_cust_oact_prog_ri'
);
CREATE TABLE dm_cust_oact_prog_ri_print (
 cust_id STRING COMMENT '客户id'
,cust_nme STRING COMMENT '客户姓名'
,cust_mob_tel STRING COMMENT '客户手机号'
,cust_curr_step STRING COMMENT '客户当前步骤'
,cust_curr_step_num INT COMMENT '客户当前步骤数字'
,cust_curr_step_occu_tm STRING COMMENT '客户当前步骤最近发生时间'
,user_id STRING COMMENT '开户时使用的user_id'
,tech_sys_time STRING COMMENT '技术字段,更新时间'
,primary key (user_id,cust_curr_step) not enforced
) WITH (
  'connector' = 'print'
);
CREATE TABLE dm_crh_cust_oact_rec_ri
(
 op_type string
,op_ts string
,`after` string
,current_ts string
,curr_datetime as get_json_value(`after`,'CURR_DATETIME')
,user_id as get_json_value(`after`,'USER_ID')
,request_no as get_json_value(`after`,'REQUEST_NO')
,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
,proc_time as PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'BUSINFLOWRECORD',
  'properties.bootstrap.servers' = '111',
  'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset',
  --'csv.field-delimiter' = ','
  --'scan.startup.mode' = 'timestamp',
  --'scan.startup.timestamp-millis' = '167535360', --通过 
unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
  'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
  'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_cust_info_ri
(
 op_type string
,op_ts string
,`after` string
,current_ts string
,client_name as get_json_value(`after`,'CLIENT_NAME')
,request_status as get_json_value(`after`,'REQUEST_STATUS')
,mobile_tel as get_json_value(`after`,'MOBILE_TEL')
,user_id as get_json_value(`after`,'USER_ID')
,active_datetime as get_json_value(`after`,'ACTIVE_DATETIME')
,channel_code as get_json_value(`after`,'CHANNEL_CODE')
,broker_code as get_json_value(`after`,'BROKER_CODE')
,user_gender as get_json_value(`after`,'USER_GENDER')
,birthday as get_json_value(`after`,'BIRTHDAY')
,client_id as get_json_value(`after`,'CLIENT_ID')
) WITH (
  'connector' = 'kafka',
  'topic' = 'USERQUERYEXTINFO',
  'properties.bootstrap.servers' = '111',
  'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset',
  --'csv.field-delimiter' = ','
  --'scan.startup.mode' = 'timestamp',
  --'scan.startup.timestamp-millis' = '167535360', --通过 
unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
  'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
  'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_audit_rec_ri
(
 op_type string
,op_ts string
,`after` string
,current_ts string
,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
,curr_datetime as get_json_value(`after`,'CURR_DATETIME')
,request_no as get_json_value(`after`,'REQUEST_NO')
) WITH (
  'connector' = 'kafka',
  'topic' = 'CRH_USER.BUSINFLOWAUDITRECORD',
  'properties.bootstrap.servers' = '111',
  'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset',
  --'csv.field-delimiter' = ','
  --'scan.startup.mode' = 'timestamp',
  --'scan.startup.timestamp-millis' = '167535360', --通过 
unix_timestamp('2023-02-03 00:00:00')*1000 获取开始时间点
  'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
  'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
CREATE TABLE dm_crh_user_vidro_ri
(
 op_type string
,op_ts string
,`after` string
,current_ts string
,business_flag as get_json_value(`after`,'BUSINESS_FLAG')
,create_datetime as get_json_value(`after`,'CREATE_DATETIME')
,user_id as get_json_value(`after`,'USER_ID')
,join_position_str as get_json_value(`after`,'JOIN_POSITION_STR')
) WITH (
  'connector' = 'kafka',
  'topic' = 'CRH_USER.USERVIDEOFLOW',
  'properties.bootstrap.servers' = '111',
  'properties.group.id' = 'iap_dm_cust_oact_prog_ri_test_env',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset',
  --'csv.field-delimiter' = ','
  --'scan.startup.mode' = 'timestamp',
  --'scan.startup.timestamp-millis' = '167535360

Re: 项目中引入 flink-sql-connector-oracle-cdc-2.3.0.jar 后启动报解析配置异常

2023-03-25 文章 Leonard Xu
flink-sql-connector-xx 都是uber jar, 不应该在项目中直接uber jar,你在项目中应该引入 
flink-connector-xx 依赖并自己管理。


Best,
Leonard

> On Mar 25, 2023, at 3:25 PM, casel.chen  wrote:
> 
> 项目中引入 flink-sql-connector-oracle-cdc-2.3.0.jar 
> 后启动过程中报如下异常,查了一下该jar下有oracle.xml.jaxp.JXDocumentBuilderFactory类,有什么办法解决么?
> 
> 
> ERROR StatusLogger Caught javax.xml.parsers.ParserConfigurationException 
> setting feature http://xml.org/sax/features/external-general-entities to 
> false on DocumentBuilderFactory 
> oracle.xml.jaxp.JXDocumentBuilderFactory@68dc098b: 
> javax.xml.parsers.ParserConfigurationException
> javax.xml.parsers.ParserConfigurationException
> at 
> oracle.xml.jaxp.JXDocumentBuilderFactory.setFeature(JXDocumentBuilderFactory.java:374)
> at 
> org.apache.logging.log4j.core.config.xml.XmlConfiguration.setFeature(XmlConfiguration.java:204)
> at 
> org.apache.logging.log4j.core.config.xml.XmlConfiguration.disableDtdProcessing(XmlConfiguration.java:197)
> at 
> org.apache.logging.log4j.core.config.xml.XmlConfiguration.newDocumentBuilder(XmlConfiguration.java:186)
> at 
> org.apache.logging.log4j.core.config.xml.XmlConfiguration.(XmlConfiguration.java:89)
> at 
> org.apache.logging.log4j.core.config.xml.XmlConfigurationFactory.getConfiguration(XmlConfigurationFactory.java:46)
> at 
> org.apache.logging.log4j.core.config.ConfigurationFactory$Factory.getConfiguration(ConfigurationFactory.java:558)
> at 
> org.apache.logging.log4j.core.config.ConfigurationFactory$Factory.getConfiguration(ConfigurationFactory.java:482)
> at 
> org.apache.logging.log4j.core.config.ConfigurationFactory.getConfiguration(ConfigurationFactory.java:322)
> at 
> org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:695)
> 



项目中引入 flink-sql-connector-oracle-cdc-2.3.0.jar 后启动报解析配置异常

2023-03-25 文章 casel.chen
项目中引入 flink-sql-connector-oracle-cdc-2.3.0.jar 
后启动过程中报如下异常,查了一下该jar下有oracle.xml.jaxp.JXDocumentBuilderFactory类,有什么办法解决么?


ERROR StatusLogger Caught javax.xml.parsers.ParserConfigurationException 
setting feature http://xml.org/sax/features/external-general-entities to false 
on DocumentBuilderFactory oracle.xml.jaxp.JXDocumentBuilderFactory@68dc098b: 
javax.xml.parsers.ParserConfigurationException
 javax.xml.parsers.ParserConfigurationException
at 
oracle.xml.jaxp.JXDocumentBuilderFactory.setFeature(JXDocumentBuilderFactory.java:374)
at 
org.apache.logging.log4j.core.config.xml.XmlConfiguration.setFeature(XmlConfiguration.java:204)
at 
org.apache.logging.log4j.core.config.xml.XmlConfiguration.disableDtdProcessing(XmlConfiguration.java:197)
at 
org.apache.logging.log4j.core.config.xml.XmlConfiguration.newDocumentBuilder(XmlConfiguration.java:186)
at 
org.apache.logging.log4j.core.config.xml.XmlConfiguration.(XmlConfiguration.java:89)
at 
org.apache.logging.log4j.core.config.xml.XmlConfigurationFactory.getConfiguration(XmlConfigurationFactory.java:46)
at 
org.apache.logging.log4j.core.config.ConfigurationFactory$Factory.getConfiguration(ConfigurationFactory.java:558)
at 
org.apache.logging.log4j.core.config.ConfigurationFactory$Factory.getConfiguration(ConfigurationFactory.java:482)
at 
org.apache.logging.log4j.core.config.ConfigurationFactory.getConfiguration(ConfigurationFactory.java:322)
at 
org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:695)



Re: flink sql作业监控指标operator name和task name超长导致prometheus OOM问题

2023-03-24 文章 Weihua Hu
Hi,

现在不会过滤指标,可以尝试修改 PrometheusReporter 将不需要的 label 过滤掉

https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java#L104
需要注意这里的 key 的格式是   这种

Best,
Weihua


On Fri, Mar 24, 2023 at 2:47 PM casel.chen  wrote:

> 使用prometheus监控flink
> sql作业,发现没一会儿工夫就将prometheus内存(30GB)占满了,查了一下是因为作业指标名称过长导致的,像flink
> sql作业这种operator name和task name默认是根据sql内容拼装的,一旦sql出现的列名很多就会导致指标名称过长。
> 请问这种情况Flink社区有什么建议?prometheus抓取的时候能够过滤掉吗?只保留operator_id和task_id。
> 要是自己想将现有拼装名称修改成哈希值的话应该改哪个类呢?谢谢!


flink sql作业监控指标operator name和task name超长导致prometheus OOM问题

2023-03-24 文章 casel.chen
使用prometheus监控flink 
sql作业,发现没一会儿工夫就将prometheus内存(30GB)占满了,查了一下是因为作业指标名称过长导致的,像flink sql作业这种operator 
name和task name默认是根据sql内容拼装的,一旦sql出现的列名很多就会导致指标名称过长。
请问这种情况Flink社区有什么建议?prometheus抓取的时候能够过滤掉吗?只保留operator_id和task_id。
要是自己想将现有拼装名称修改成哈希值的话应该改哪个类呢?谢谢!

Re: Flink-Sql Watermarkers问题

2023-03-15 文章 ying lin
Flink SQL 现在只能在create table 语句中指定watermark,另外一种迂回的做法,就是参考一下Flink SQL
把Tabe转成流,然后在流上做清洗后再指定watermark


回复: Flink-Sql Watermarkers问题

2023-03-13 文章 吴先生
好的感谢,我关注下


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年3月13日 18:49 |
| 收件人 |  |
| 主题 | Re: Flink-Sql Watermarkers问题 |
Hi

目前sql只能在create table时指定,不过有新的扩展功能,相关FLIP正在讨论中,你可以关注一下
https://cwiki.apache.org/confluence/display/FLINK/FLIP-296%3A+Extend+watermark-related+features+for+SQL

Best,
Shammon.FY

On Mon, Mar 13, 2023 at 6:29 PM 吴先生 <15951914...@163.com> wrote:

hi,
我在使用Flink-Sql 1.14版本时能否不在create table处指定watermarkers,因为源数据需要做一些清洗之后再指定水位线


| |
吴先生
|
|
15951914...@163.com
|


Re: Flink-Sql Watermarkers问题

2023-03-13 文章 Shammon FY
Hi

目前sql只能在create table时指定,不过有新的扩展功能,相关FLIP正在讨论中,你可以关注一下
https://cwiki.apache.org/confluence/display/FLINK/FLIP-296%3A+Extend+watermark-related+features+for+SQL

Best,
Shammon.FY

On Mon, Mar 13, 2023 at 6:29 PM 吴先生 <15951914...@163.com> wrote:

> hi,
> 我在使用Flink-Sql 1.14版本时能否不在create table处指定watermarkers,因为源数据需要做一些清洗之后再指定水位线
>
>
> | |
> 吴先生
> |
> |
> 15951914...@163.com
> |


Flink-Sql Watermarkers问题

2023-03-13 文章 吴先生
hi,
我在使用Flink-Sql 1.14版本时能否不在create table处指定watermarkers,因为源数据需要做一些清洗之后再指定水位线


| |
吴先生
|
|
15951914...@163.com
|

flink sql多条cdc数据流实时regular join如何减少作业状态?

2023-03-11 文章 casel.chen
当前flink实时作业接的kafka canal json格式的cdc数据,mysql表会有新增和更新数据,但不会有物理删除。
如果直接多条cdc数据流实时关联会导致作业状态很大,请教:
1. 有没有什么办法可以减少作业状态?
2. cdc格式的retract流可以加去重变成append流吗?
3. 使用append流多流关联是不是能减少作业状态?

Re: flink sql

2023-03-03 文章 小昌同学
好滴  谢谢大佬呀


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 Replied Message 
| From | 17610775726<17610775...@163.com> |
| Date | 3/3/2023 15:55 |
| To | user-zh@flink.apache.org |
| Cc | user-zh |
| Subject | Re:flink sql |
Hi



可以通过设置 pipeline.operator-chaining = false 来实现。


Best
JasonLee


 Replied Message 
| From | 小昌同学 |
| Date | 03/3/2023 15:50 |
| To | user-zh |
| Subject | flink sql |
各位大佬,请教一下如何使用flink sql实现DataStreaming的disableOperatorChaining功能


| |
小昌同学
|
|
ccc0606fight...@163.com
|

flink sql

2023-03-02 文章 小昌同学
各位大佬,请教一下如何使用flink sql实现DataStreaming的disableOperatorChaining功能


| |
小昌同学
|
|
ccc0606fight...@163.com
|

Re: flink sql接cdc数据源关联维表写入下游数据库发现漏数据

2023-03-02 文章 Shengkai Fang
听上去像是数据乱序了。可以看看这个文档对应的解决下[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/determinism/

Best,
Shengkai

casel.chen  于2023年3月1日周三 16:18写道:

> flink sql上游接kafka canal json topic消费mysql同步过来的变更,接着关联几张维表,最后写入下游数据库发现漏数据。
>
> 随后在写目标数据库中加了一些日志后发现同一主键的变更记录(前后发生间隔时间很短)被发送到了不同的TaskManager处理,导致新数据被旧数据覆盖,造成漏数据现象。
> 请问:
> 1. cdc数据源关联维表后会被分散到不同TaskManager吗?什么情况下会发生?
> 2. 如何解决这个问题?是需要在写目标表之前加一层窗口去重[1]吗?
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/queries/window-deduplication/


Re: flink sql jdbc connector是否支持多流拼接?

2023-03-02 文章 Shengkai Fang
hi. 手动使用 join 将多个流拼接起来?

Best,
Shengkai

casel.chen  于2023年3月2日周四 21:01写道:

> flink sql jdbc connector是否支持多流拼接?
> 业务上存在基于主键打宽场景,即多个流有相同的主键字段,除此之前其他字段没有重叠,现在想打成一张大宽表写入mysql/oracle这些关系数据库。
> 每条流更新大宽表的一部分字段。


flink sql jdbc connector是否支持多流拼接?

2023-03-02 文章 casel.chen
flink sql jdbc connector是否支持多流拼接?
业务上存在基于主键打宽场景,即多个流有相同的主键字段,除此之前其他字段没有重叠,现在想打成一张大宽表写入mysql/oracle这些关系数据库。
每条流更新大宽表的一部分字段。

Re: Re: Flink SQL 如何优化以及处理反压

2023-03-01 文章 Guojun Li
可以看一下反压算子是否出现在同一台机器(排除单点故障)。比如使用了 rocksdb + hdd 盘;单机负载过高;磁盘打满等。
如果不是单点故障,可以打 jstack 查看对应的线程具体在执行什么样的操作,再进行相应的逻辑优化。

On Tue, Jan 31, 2023 at 6:01 PM lxk  wrote:

> 现在从web ui上看,瓶颈主要在于group by 聚合函数之后去重这个逻辑。
> 而且SQL这个并行度是全局设置的,没法针对某一个特定的算子设置并行度,并行度多了之后,资源又感觉有点吃紧。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-01-31 17:45:15,"weijie guo"  写道:
> >最好先找到导致下游处理过慢的瓶颈算子,适当扩大一下并发。如果还不行,看下jstack的情况,可能需要调整逻辑。
> >
> >Best regards,
> >
> >Weijie
> >
> >
> >ssmq <374060...@qq.com.invalid> 于2023年1月31日周二 17:22写道:
> >
> >> 你可以测试不写入clickhouse是否还存在反压,如果不是因为写入瓶颈的话就从你的处理逻辑优化了
> >>
> >>
> >> 发件人: lxk
> >> 发送时间: 2023年1月31日 15:16
> >> 收件人: user-zh@flink.apache.org
> >> 主题: Flink SQL 如何优化以及处理反压
> >>
> >> Flink版本:1.16.0
> >> 目前在使用Flink SQL进行多流关联,并写入Clickhouse中
> >> 具体代码如下:
> >> select \
> >> header.id as id, \
> >> LAST_VALUE(header.order_status), \
> >> LAST_VALUE(header.customer_id), \
> >> LAST_VALUE(header.shop_id), \
> >> LAST_VALUE(header.parent_order_id), \
> >> LAST_VALUE(header.order_at), \
> >> LAST_VALUE(header.pay_at), \
> >> LAST_VALUE(header.channel_id), \
> >> LAST_VALUE(header.root_order_id), \
> >> LAST_VALUE(header.last_updated_at), \
> >> item.id as item_id, \
> >> LAST_VALUE(item.order_id) as order_id, \
> >> LAST_VALUE(item.row_num), \
> >> LAST_VALUE(item.goods_id), \
> >> LAST_VALUE(item.s_sku_code), \
> >> LAST_VALUE(item.qty), \
> >> LAST_VALUE(item.p_paid_sub_amt), \
> >> LAST_VALUE(item.p_sp_sub_amt), \
> >> LAST_VALUE(item.bom_type), \
> >> LAST_VALUE(item.last_updated_at) as item_last_updated_at, \
> >> LAST_VALUE(item.display_qty), \
> >> LAST_VALUE(delivery.del_type), \
> >> LAST_VALUE(delivery.time_slot_type), \
> >> LAST_VALUE(delivery.time_slot_date), \
> >> LAST_VALUE(delivery.time_slot_time_from), \
> >> LAST_VALUE(delivery.time_slot_time_to), \
> >> LAST_VALUE(delivery.sku_delivery_type), \
> >> LAST_VALUE(delivery.last_updated_at) as del_last_updated_at, \
> >> LAST_VALUE(promotion.id) as promo_id, \
> >> LAST_VALUE(promotion.order_item_id), \
> >> LAST_VALUE(promotion.p_promo_amt), \
> >> LAST_VALUE(promotion.promotion_category), \
> >> LAST_VALUE(promotion.promo_type), \
> >> LAST_VALUE(promotion.promo_sub_type), \
> >> LAST_VALUE(promotion.last_updated_at) as promo_last_updated_at, \
> >> LAST_VALUE(promotion.promotion_cost) \
> >> from \
> >>   item \
> >>   join \
> >>   header  \
> >>   on item.order_id = header.id \
> >>   left join \
> >>   delivery \
> >>   on item.order_id = delivery.order_id \
> >>   left join \
> >>   promotion \
> >>   on item.id =promotion.order_item_id \
> >>   group by header.id,item.id
> >> 在Flink WEB UI 上发现程序反压很严重,而且时不时挂掉:
> >> https://pic.imgdb.cn/item/63d8bebbface21e9ef3c92fe.jpg
> >>
> >> 参考了京东的一篇文章
> >>
> https://flink-learning.org.cn/article/detail/1e86b8b38faaeefd5ed7f70858aa40bc
> >> ,对相关参数做了调整,但是发现有些功能在Flink 1.16中已经做了相关优化了,同时加了这些参数之后对程序没有起到任何优化的作用。
> >>
> >> conf.setString("table.exec.mini-batch.enabled", "true");
> >> conf.setString("table.exec.mini-batch.allow-latency", "15 s");
> >> conf.setString("table.exec.mini-batch.size", "5000");
> >> conf.setString("table.exec.state.ttl", "86400 s");
> >> conf.setString("table.exec.disabled-operators", "NestedLoopJoin");
> >> conf.setString("table.optimizer.join.broadcast-threshold", "-1");
> >> conf.setString("table.optimizer.multiple-input-enabled", "true");
> >> conf.setString("table.exec.shuffle-mode", "POINTWISE_EDGES_PIPELINED");
> >> conf.setString("taskmanager.network.sort-shuffle.min-parallelism", "8");
> >> 想请教下,针对Flink SQL如何处理反压,同时有什么其他的优化手段?
> >>
> >>
> >>
> >>
>


flink sql接cdc数据源关联维表写入下游数据库发现漏数据

2023-03-01 文章 casel.chen
flink sql上游接kafka canal json topic消费mysql同步过来的变更,接着关联几张维表,最后写入下游数据库发现漏数据。
随后在写目标数据库中加了一些日志后发现同一主键的变更记录(前后发生间隔时间很短)被发送到了不同的TaskManager处理,导致新数据被旧数据覆盖,造成漏数据现象。
请问:
1. cdc数据源关联维表后会被分散到不同TaskManager吗?什么情况下会发生?
2. 如何解决这个问题?是需要在写目标表之前加一层窗口去重[1]吗?


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/queries/window-deduplication/

Re: 使用flink sql 将kafka的数据同步到mysql无法删除。

2023-02-25 文章 Jane Chan
Hi,

原问题中 String 变量 kafka 和 mysql 赋值反了, 以及能提供下所使用的 flink 版本吗, 我使用 1.16.1 没有复现此问题

payload

{
  "before": {
"rowid": "f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d",
"63f73b332e77497da91286f0": "Jerry",
"63f73b3f2e77497da91286fb": "mobile number",
"63f73b3f2e77497da91286fc": "telephone number"
  },
  "after": null,

"source": {...},
  "op": "d",
  "ts_ms": 1677342340042,
  "transaction": null
}

flink sql

Flink SQL> insert into `电话` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as
`名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `手机` from (select `rowid` as `rowID`,
`63f73b332e77497da91286f0` as `名称`,`63f73b3f2e77497da91286fb` as `手机`,
`63f73b3f2e77497da91286fc` as `座机` from `电话_1`) as t_1;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8490c9530d3a97e73aeedfe9745f2fe3

mysql output

mysql> select * from 电话;
+
--++---+--+
| rowID| 名称   | 手机  | 座机
  |
+
--++---+--+
| f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d | Tom| mobile number | telephone
number |
+
--++---+--+
1 row in set (0.00 sec)

mysql> select * from 电话;
+
--++---+--+
| rowID| 名称   | 手机  | 座机
  |
+
--++---+--+
| f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d | Jerry  | mobile number | telephone
number |
+
--++---+--+
1 row in set (0.00 sec)

mysql> select * from 电话;
Empty set (0.00 sec)

Best,
Jane

On Fri, Feb 24, 2023 at 2:21 PM 陈佳豪  wrote:

> -建表语法如下
> String kafka = "CREATE TABLE `电话` " +
> "(`rowID` VARCHAR(255),`名称` STRING,`手机` VARCHAR(255),`座机`
> VARCHAR(255),  " +
> "  PRIMARY KEY (`rowID`) NOT ENFORCED  ) " +
> " WITH " +
> "('connector' = 'jdbc',   " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver',   " +
> " 'url' = 'jdbc:mysql://XX:6506/meihua_test',  " +
> "  'username' = 'root',  " +
> "  'password' = '123456',  " +
> "  'table-name' = '电话'  )";
>
> String mysql = "CREATE TABLE `电话_1` " +
> "(`rowid` VARCHAR(100)," +
> "`63f73b332e77497da91286f0` VARCHAR(100)," +
> "`63f73b3f2e77497da91286fb` VARCHAR(100)," +
> "`63f73b3f2e77497da91286fc` VARCHAR(100)," +
> "`op` STRING ," +
> " PRIMARY KEY (rowid) NOT ENFORCED )" +
> " WITH " +
> "( 'connector' = 'kafka', " +
> "'topic' =
> 'sz_worksheet-63f82984f3ec743e45b0d561-63f73b332e77497da91286ef'," +
> " 'properties.bootstrap.servers' = 'XX:9092'," +
> " 'scan.startup.mode' = 'earliest-offset', " +
> "'format' = 'debezium-json' )";
> -执行语句如下
> String insert = "insert into `电话` select `t_1`.`rowID` as
> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
> " ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as
> `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机`
> from `电话_1` ) as t_1";
> -操作数据如下
>
>
> String insert = "insert into `电话` select `t_1`.`rowID` as
> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
> " ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as
> `名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机`
> from `电话_1` ) as t_1";
> -执行语句如下
> {
> "op":"d",
> "before":{
> "rowid":"f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d"
> },
> "after":null
> }
> 现在的结论是可以新增和修改,但是无法删除。难道insert into这个语句搞不定吗? 走的debezuim json序列化的格式。
> 各位大佬帮看下 谢谢。


Re: 在计算Window Top-N时,Flink SQL 时间语义不生效

2023-02-24 文章 Shuo Cheng
更乱了哦...可以尝试加个附件或推到 github, 贴个链接

On Fri, Feb 24, 2023 at 4:59 PM wei_yuze  wrote:

>
> 刚才的邮件正文代码出现乱码,现在重新发送。-您好!我在运行Flink程序时遇到了一个问题,特来向各位大佬请教。程序目标:用FlinkSQL求窗口nbsp;Top-5,开一小时的窗口。数据源为Kafka,我分批向Kafka里传入数据。计算出的nbsp;Top-5结果,写入MySQL。问题:一小时窗口设置完全没生效,事件时间和处理时间两种时间语义都测试了。我每向Kafka里传入一批数据,MySQL都会看到五条新增的Top-5数据,可两批源数据之间的时间间隔并没有到一小时。问题代码初步定位:TUMBLE(TABLEwatermarkedTable,DESCRIPTOR(ts),INTERVAL1HOUR)完整源代码:nbsp;nbsp;nbsp;nbsp;finalStreamExecutionEnvironmentstreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment();nbsp;nbsp;nbsp;nbsp;//Createtableenvironmentnbsp;nbsp;nbsp;nbsp;StreamTableEnvironmentstreamTableEnvironment=StreamTableEnvironment.create(streamExecutionEnvironment);nbsp;nbsp;nbsp;nbsp;//接入Kafka数据源nbsp;nbsp;nbsp;nbsp;KafkaSourcestringkafkaSource=KafkaSourcenbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.stringbuilder()nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.setBootstrapServers(Config.KAFKA_BROKERS)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.setTopics(Config.KAFKA_TOPIC_EVENT)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.setGroupId(flink-consumer)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.setStartingOffsets(OffsetsInitializer.earliest())nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.setValueOnlyDeserializer(newSimpleStringSchema())nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.build();nbsp;nbsp;nbsp;nbsp;DataStreamSourcestringstringStream=streamExecutionEnvironmentnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.fromSource(nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;kafkaSource,nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;WatermarkStrategy.noWatermarks(),nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;Kafkastringsourcewithoutwatermarknbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;);nbsp;nbsp;nbsp;nbsp;//Deserializestringstreamnbsp;nbsp;nbsp;nbsp;SingleOutputStreamOperatoreventdeserializedStream=stringStreamnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.map(nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;newMapFunctionstring,event=(){nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;publicEventmap(StringjsonString)throwsException{nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;SimpleDateFormatsimpleDateFormat=newSimpleDateFormat(-MM-ddHH:mm:ss);nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;ObjectMapperobjectMapper=newObjectMapper();nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;objectMapper.setDateFormat(simpleDateFormat);nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;EventdeserializedObject=objectMapper.readValue(jsonString,Event.class);nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;returndeserializedObject;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;}nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;});nbsp;nbsp;nbsp;nbsp;SingleOutputStreamOperatoreventwatermarkedStream=deserializedStream.assignTimestampsAndWatermarks(nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;WatermarkStrategynbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.eventforBoundedOutOfOrderness(Duration.ofSeconds(0L))//nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.withTimestampAssigner((event,l)-gt;event.getTs().getTime())nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;.withTimestampAssigner(nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;newSerializableTimestampAssignerevent(){nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;@Override
> 

在计算Window Top-N时,Flink SQL 时间语义不生效

2023-02-24 文章 wei_yuze

使用flink sql 将kafka的数据同步到mysql无法删除。

2023-02-23 文章 陈佳豪
-建表语法如下
String kafka = "CREATE TABLE `电话` " +
"(`rowID` VARCHAR(255),`名称` STRING,`手机` VARCHAR(255),`座机` VARCHAR(255),  " +
"  PRIMARY KEY (`rowID`) NOT ENFORCED  ) " +
" WITH " +
"('connector' = 'jdbc',   " +
" 'driver' = 'com.mysql.cj.jdbc.Driver',   " +
" 'url' = 'jdbc:mysql://XX:6506/meihua_test',  " +
"  'username' = 'root',  " +
"  'password' = '123456',  " +
"  'table-name' = '电话'  )";

String mysql = "CREATE TABLE `电话_1` " +
"(`rowid` VARCHAR(100)," +
"`63f73b332e77497da91286f0` VARCHAR(100)," +
"`63f73b3f2e77497da91286fb` VARCHAR(100)," +
"`63f73b3f2e77497da91286fc` VARCHAR(100)," +
"`op` STRING ," +
" PRIMARY KEY (rowid) NOT ENFORCED )" +
" WITH " +
"( 'connector' = 'kafka', " +
"'topic' = 'sz_worksheet-63f82984f3ec743e45b0d561-63f73b332e77497da91286ef'," +
" 'properties.bootstrap.servers' = 'XX:9092'," +
" 'scan.startup.mode' = 'earliest-offset', " +
"'format' = 'debezium-json' )";
-执行语句如下
String insert = "insert into `电话` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as 
`名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
" ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as 
`名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机` from 
`电话_1` ) as t_1";
-操作数据如下


String insert = "insert into `电话` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as 
`名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
" ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as 
`名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机` from 
`电话_1` ) as t_1";
-执行语句如下
{
"op":"d",
"before":{
"rowid":"f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d"
},
"after":null
}
现在的结论是可以新增和修改,但是无法删除。难道insert into这个语句搞不定吗? 走的debezuim json序列化的格式。
各位大佬帮看下 谢谢。

Re: Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-23 文章 Shuo Cheng
> 你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force?

Sink upsert materialize would be applied in the following circumstances:
1. `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to FORCE and sink's primary key
nonempty.
2. `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to AUTO and sink's primary key
doesn't contain upsert keys of the input update stream.

Note: upsert materializing operator use state to resolve disorder problems
which may incur additional performance regression.

Best,
Shuo

On Fri, Feb 24, 2023 at 10:02 AM casel.chen  wrote:

> 你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force?
>
>
> Because of the disorder of ChangeLog data caused by Shuffle in distributed
> system, the data received by Sink may not be the order of global upsert. So
> add upsert materialize operator before upsert sink. It receives the
> upstream changelog records and generate an upsert view for the downstream.
> By default, the materialize operator will be added when a distributed
> disorder occurs on unique keys. You can also choose no
> materialization(NONE) or force materialization(FORCE).
>
> Possible values:
> "NONE"
> "AUTO"
> "FORCE"
>
>
> public static final ConfigOption
> TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
> key("table.exec.sink.upsert-materialize")
> .enumType(UpsertMaterialize.class)
> .defaultValue(UpsertMaterialize.AUTO)
> .withDescription(
> Description.builder()
> .text(
> "Because of the disorder of
> ChangeLog data caused by Shuffle in distributed system, "
> + "the data received
> by Sink may not be the order of global upsert. "
> + "So add upsert
> materialize operator before upsert sink. It receives the "
> + "upstream changelog
> records and generate an upsert view for the downstream.")
> .linebreak()
> .text(
> "By default, the materialize
> operator will be added when a distributed disorder "
> + "occurs on unique
> keys. You can also choose no materialization(NONE) "
> + "or force
> materialization(FORCE).")
> .build());
>
>
>
>
>
> 在 2023-02-22 15:34:27,"Shuo Cheng"  写道:
> >Hi,
> >
> >Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?",  *checking out
> >ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details
> about
> >solution of disordering problems in KeyBy shuffling.
> >
> >Best,
> >Shuo
> >
> >On Wed, Feb 22, 2023 at 10:23 AM casel.chen  wrote:
> >
> >>
> >>
> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
> >>
> >>
> >> 在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
> >> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
> >> >
> >> >Best,
> >> >Shengkai
> >> >
> >> >[1]
> >> >
> >>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
> >> >
> >> >Shammon FY  于2023年2月20日周一 08:41写道:
> >> >
> >> >> Hi
> >> >>
> >> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
> >> >>
> >> >> Best,
> >> >> Shammon
> >> >>
> >> >>
> >> >> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
> >> >>
> >> >> > Hi,
> >> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> >> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by
> 主键,然后再执行insert
> >> into
> >> >> >
> >> >> >
> >> >> > Thanks
> >> >> >
> >> >> >
> >> >> >
> >> >> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
> >> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> >> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> >> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >> >> > >
> >> >> > >
> >> >> > >请问:
> >> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >> >> > >我理解flink
> >> >> >
> >> >>
> >>
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >> >> > >
> >> >> >
> >> >>
> >>
>


Re:Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-23 文章 casel.chen
你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force? 


Because of the disorder of ChangeLog data caused by Shuffle in distributed 
system, the data received by Sink may not be the order of global upsert. So add 
upsert materialize operator before upsert sink. It receives the upstream 
changelog records and generate an upsert view for the downstream.
By default, the materialize operator will be added when a distributed disorder 
occurs on unique keys. You can also choose no materialization(NONE) or force 
materialization(FORCE).

Possible values:
"NONE"
"AUTO"
"FORCE"


public static final ConfigOption 
TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
key("table.exec.sink.upsert-materialize")
.enumType(UpsertMaterialize.class)
.defaultValue(UpsertMaterialize.AUTO)
.withDescription(
Description.builder()
.text(
"Because of the disorder of 
ChangeLog data caused by Shuffle in distributed system, "
+ "the data received by 
Sink may not be the order of global upsert. "
+ "So add upsert 
materialize operator before upsert sink. It receives the "
+ "upstream changelog 
records and generate an upsert view for the downstream.")
.linebreak()
.text(
"By default, the materialize 
operator will be added when a distributed disorder "
+ "occurs on unique keys. 
You can also choose no materialization(NONE) "
+ "or force 
materialization(FORCE).")
.build());





在 2023-02-22 15:34:27,"Shuo Cheng"  写道:
>Hi,
>
>Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?",  *checking out
>ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details about
>solution of disordering problems in KeyBy shuffling.
>
>Best,
>Shuo
>
>On Wed, Feb 22, 2023 at 10:23 AM casel.chen  wrote:
>
>>
>> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
>>
>>
>> 在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
>> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
>> >
>> >Best,
>> >Shengkai
>> >
>> >[1]
>> >
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
>> >
>> >Shammon FY  于2023年2月20日周一 08:41写道:
>> >
>> >> Hi
>> >>
>> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
>> >>
>> >> Best,
>> >> Shammon
>> >>
>> >>
>> >> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
>> >>
>> >> > Hi,
>> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert
>> into
>> >> >
>> >> >
>> >> > Thanks
>> >> >
>> >> >
>> >> >
>> >> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
>> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
>> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>> >> > >
>> >> > >
>> >> > >请问:
>> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>> >> > >我理解flink
>> >> >
>> >>
>> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>> >> > >
>> >> >
>> >>
>>


Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-21 文章 Shuo Cheng
Hi,

Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?",  *checking out
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details about
solution of disordering problems in KeyBy shuffling.

Best,
Shuo

On Wed, Feb 22, 2023 at 10:23 AM casel.chen  wrote:

>
> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
>
>
> 在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
> >
> >Best,
> >Shengkai
> >
> >[1]
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
> >
> >Shammon FY  于2023年2月20日周一 08:41写道:
> >
> >> Hi
> >>
> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
> >>
> >> Best,
> >> Shammon
> >>
> >>
> >> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
> >>
> >> > Hi,
> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert
> into
> >> >
> >> >
> >> > Thanks
> >> >
> >> >
> >> >
> >> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >> > >
> >> > >
> >> > >请问:
> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >> > >我理解flink
> >> >
> >>
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >> > >
> >> >
> >>
>


  1   2   3   4   5   6   7   8   9   10   >