Re: flink sql回撤流sink优化问题

2021-12-25 文章 Zhiwen Sun
不用那么复杂,正常的 insert select group by 即可, 一分钟写一次 mysql 就行。

参考 JDBC sink [1] 中的 sink.buffer-flush.interval 和 sink.buffer-flush.max-rows
参数

[1] :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/


Zhiwen Sun



On Thu, Dec 23, 2021 at 8:15 AM casel.chen  wrote:

> flink sql中aggregate without
> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>
>
> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>
> orderid.   categorydt  amt
>
> 订单id 商品类型   购买时间(MMddHH)  购买金额
>
>
>
>
> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>
>
>
> INSERT INTO mysql_sink_table
>
> SELECT category, dt, LAST_VALUE(total)
>
> OVER (
>
>   PARTITION BY category
>
>   ORDER BY PROCTIME()
>
>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>
> ) AS var1
>
> FROM (
>
>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>
> );


flinksql????????

2021-12-25 文章 ??????
??
    ??
mongodbmongodb??:_??flinksqlflink






 

Re: flink sql回撤流sink优化问题

2021-12-25 文章 郭伟权
结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量

casel.chen  于2021年12月23日周四 08:15写道:

> flink sql中aggregate without
> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>
>
> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>
> orderid.   categorydt  amt
>
> 订单id 商品类型   购买时间(MMddHH)  购买金额
>
>
>
>
> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>
>
>
> INSERT INTO mysql_sink_table
>
> SELECT category, dt, LAST_VALUE(total)
>
> OVER (
>
>   PARTITION BY category
>
>   ORDER BY PROCTIME()
>
>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>
> ) AS var1
>
> FROM (
>
>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>
> );


Re: flink sql回撤流sink优化问题

2021-12-25 文章 郭伟权
结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量

casel.chen  于2021年12月23日周四 08:15写道:

> flink sql中aggregate without
> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>
>
> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>
> orderid.   categorydt  amt
>
> 订单id 商品类型   购买时间(MMddHH)  购买金额
>
>
>
>
> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>
>
>
> INSERT INTO mysql_sink_table
>
> SELECT category, dt, LAST_VALUE(total)
>
> OVER (
>
>   PARTITION BY category
>
>   ORDER BY PROCTIME()
>
>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>
> ) AS var1
>
> FROM (
>
>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>
> );